Attribute support and multi-threading for Pyro
John Wiegley
jwiegley at inprise.com
Fri May 19 15:10:08 EDT 2000
The following diffs will add remote attribute getting/setting support
to Pyro, as well as threading (pass "threaded = 1" to
Pyro.core.Daemon).
These were sent to the author, but he must be very busy, and I would
like further comments by anyone else who is using Pyro.
These diffs (which can be merged with your sources using 'patch') are
against the 1.1 package, which was announced recently.
----------------------------------------------------------------------
--- Pyro/core.py Mon May 8 13:11:21 2000
+++ Pyro/core.py Fri May 19 11:55:43 2000
@@ -52,6 +52,23 @@
args=args[:-1]+args[-1]
return apply(getattr(self.delegate,method),args,keywords)
+class ObjBaseWithAttrs(ObjBase):
+ def remote_hasattr(self, attr):
+ attr = getattr(self, attr)
+ if attr is not None:
+ from types import MethodType, BuiltinMethodType
+ if type(attr) in (MethodType, BuiltinMethodType):
+ return 'method'
+ else:
+ return 'attr'
+ return 0
+
+ def remote_getattr(self, attr):
+ return getattr(self, attr)
+
+ def remote_setattr(self, attr, value):
+ return setattr(self, attr, value)
+
#############################################################################
#
@@ -147,10 +164,13 @@
def __init__(self, URI):
self.URI = URI
self.objectID = URI.objectID
- self.adapter = Pyro.protocol.getProtocolAdapter(self.URI.protocol)
- self.adapter.bindToURI(URI)
+ self.adapter = None
def __getattr__(self, name):
+ # allows one of these to be safely pickled
+ if name != '__getinitargs__':
self._name=name; return self.__invokePYRO__
+ else:
+ raise AttributeError()
def __repr__(self):
return '<Pyro.core.DynamicProxy instance at '+str(id(self))+'>'
def __str__(self):
@@ -159,10 +179,67 @@
# Note that a slightly faster way of calling is this:
# instead of proxy.method(args...) use proxy('method',args...)
def __call__(self,method,*vargs, **kargs):
- return self.adapter.remoteInvocation(method,RIF_Varargs|RIF_Keywords,vargs,kargs)
+ self._name = method
+ return self.__invokePYRO__
def __invokePYRO__(self, *vargs, **kargs):
- return self.adapter.remoteInvocation(self._name,RIF_Varargs|RIF_Keywords,vargs,kargs)
+ if self.adapter is None:
+ self.adapter = Pyro.protocol.getProtocolAdapter(self.URI.protocol)
+ self.adapter.bindToURI(self.URI)
+ return self.adapter.remoteInvocation(self._name,
+ RIF_Varargs|RIF_Keywords,
+ vargs,kargs)
+ def __getstate__(self):
+ temp = {}
+ for key in self.__dict__.keys():
+ if key != "adapter":
+ temp[key] = self.__dict__[key]
+ return temp
+
+ def __setstate__(self, value):
+ for key in value.keys():
+ self.__dict__[key] = value[key]
+ self.__dict__['adapter'] = None
+class DynamicProxyWithAttrs(DynamicProxy):
+ def __init__(self, URI):
+ self.attr_cache = {}
+ DynamicProxy.__init__(self, URI)
+
+ def remote_getattr(self, attr, value = 0):
+ if value: meth = 'remote_getattr'
+ else: meth = 'remote_hasattr'
+ self._name = meth
+ return self.__invokePYRO__(attr)
+
+ def findattr(self, attr):
+ if self.attr_cache.has_key(attr):
+ return self.attr_cache[attr]
+
+ # Go look it up, and cache the value
+ self.attr_cache[attr] = self.remote_getattr(attr)
+ return self.attr_cache[attr]
+
+ def __setattr__(self, attr, value):
+ result = self.findattr(attr)
+ if result == 'attr':
+ self._name = 'remote_setattr'
+ return self.__invokePYRO__(attr, value)
+ else:
+ raise AttributeError()
+
+ def __getattr__(self, attr):
+ # allows one of these to be safely pickled
+ if attr != '__getinitargs__':
+ result = self.findattr(attr)
+ if result == 'method':
+ self._name = name
+ return self.__invokePYRO__
+ elif result is not None:
+ return self.remote_getattr(attr, 1)
+ else:
+ raise AttributeError()
+ else:
+ raise AttributeError()
#############################################################################
#
@@ -173,7 +250,7 @@
#############################################################################
class Daemon(Pyro.protocol.TCPServer):
- def __init__(self,protocol='PYRO',port=0):
+ def __init__(self,protocol='PYRO',port=0,threaded=0):
self.hostname = Pyro.protocol.getHostname()
if port:
self.port = port
@@ -185,10 +262,12 @@
self.adapter = Pyro.protocol.getProtocolAdapter(protocol)
self.adapter.setDaemon(self)
try:
- Pyro.protocol.TCPServer.__init__(self, DaemonSlave(), self.port)
- except socket.error:
- Log.error('Daemon','Couldn\'t start Pyro daemon- already running?')
- raise DaemonError('Couldn\'t start Pyro daemon- perhaps it\'s running already?')
+ Pyro.protocol.TCPServer.__init__(self, DaemonSlave(), self.port,
+ threaded)
+ except socket.error, msg:
+ text = 'Couldn\'t start Pyro daemon: ' + str(msg)
+ Log.error('Daemon', text)
+ raise DaemonError(text)
def __del__(self):
# server shutting down, unregister all known objects in the NS
@@ -219,10 +298,13 @@
self.implementations[object.GUID()]=(object,name)
# register the object with the NS
if self.NameServer:
- self.NameServer.register(name,PyroURI(self.hostname,object.GUID(),
- protocol=self.protocol,port=self.port))
+ URI = PyroURI(self.hostname,object.GUID(),
+ protocol=self.protocol,port=self.port)
+ self.NameServer.register(name, URI)
+ return URI
else:
Log.warn('Daemon','connecting object without naming service specified:',name)
+ return None
def disconnect(self,object):
try:
@@ -303,4 +385,3 @@
_initGeneric_post()
if banner:
print 'Pyro Server Initialized. Using Pyro V'+Pyro.PYRO_VERSION
-
--- Pyro/protocol.py Sun May 7 13:40:35 2000
+++ Pyro/protocol.py Fri May 19 11:59:05 2000
@@ -7,8 +7,9 @@
#
#############################################################################
-import select, socket, struct
+import select, socket, struct, time
import Pyro
+from threading import Thread
from Pyro.util import pickle, Log
from Pyro.errors import *
@@ -176,7 +177,7 @@
#-------- TCPServer base class
class TCPServer:
- def __init__(self, requestServer, port):
+ def __init__(self, requestServer, port, threaded):
self.slave = requestServer
self.slave.daemon=self
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -184,6 +185,8 @@
self.sock.listen(5)
self.connections = []
self.setParamsForLoop(5)
+ self.threaded = threaded
+ self.handling = []
def __del__(self):
if len(self.connections)>0:
Log.warn('TCPServer','Shutting down but there are still',len(self.connections),'active connections')
@@ -202,30 +205,51 @@
self.adapter.sendAccept(csock)
self.connections.append(conn)
Log.msg('TCPServer','new connection from',addr,'#conns=',len(self.connections))
+ return conn
else:
# we have too many open connections. Disconnect this one.
Log.msg('TCPServer','Too many open connections, closing',addr,'#conns=',len(self.connections))
self.adapter.sendDeny(csock)
+ return None
+
+ def handleRequest(self, c):
+ try:
+ time.sleep(.001)
+ self.slave.handleRequest(c)
+ except:
+ self.handleError(c)
+ if self.threaded and c in self.handling:
+ self.handling.remove(c)
def handleRequests(self, timeout=None, others=[], callback=None):
activecnt=1
while activecnt:
- socklist = self.connections+[self.sock]+others
+ connections = []
+ for conn in self.connections:
+ if conn not in self.handling:
+ connections.append(conn)
+ socklist = connections+[self.sock]+others
+ time.sleep(.001)
if timeout==None:
ins,outs,exs = select.select(socklist,[],[])
else:
ins,outs,exs = select.select(socklist,[],[],timeout)
activecnt=len(ins)
+ time.sleep(.001)
if self.sock in ins:
- self.newConnection(self.sock)
+ conn = self.newConnection(self.sock)
ins.remove(self.sock)
+ if conn: ins.append(conn)
+ time.sleep(.001)
for c in ins:
if isinstance(c,TCPConnection):
- try:
- self.slave.handleRequest(c)
- except:
- self.handleError(c)
+ if self.threaded:
+ self.handling.append(c)
+ Thread(target=self.handleRequest, args=(c,)).start()
+ else:
+ self.handleRequest(c)
ins.remove(c)
+ time.sleep(.001)
if ins and callback:
# the 'others' must have fired...
callback(ins)
@@ -243,5 +267,8 @@
def removeConnection(self, conn):
if conn in self.connections:
self.connections.remove(conn)
- Log.msg('TCPServer','removed connection with',conn.addr,' #conns=',len(self.connections))
+ if self.threaded and conn in self.handling:
+ self.handling.remove(conn)
+ Log.msg('TCPServer','removed connection with',conn.addr,
+ ' #conns=',len(self.connections))
More information about the Python-list
mailing list