[py-svn] r45537 - py/branch/lessthread/py/execnet

hpk at codespeak.net hpk at codespeak.net
Tue Aug 7 18:53:00 CEST 2007


Author: hpk
Date: Tue Aug  7 18:52:59 2007
New Revision: 45537

Modified:
   py/branch/lessthread/py/execnet/gateway.py
   py/branch/lessthread/py/execnet/register.py
Log:
(fijal,hpk) 

for now remove possibility to allow execution locally 
(i.e. if "A" instantiates a gateway on "B" then B may
not execute code on A). 

fix an issue with dangling processes with "remote_init_threads" 

instantiate receiver thread and requestqueue from _servemain()
instantiate receiver thread from local gateway initialiaztion 



Modified: py/branch/lessthread/py/execnet/gateway.py
==============================================================================
--- py/branch/lessthread/py/execnet/gateway.py	(original)
+++ py/branch/lessthread/py/execnet/gateway.py	Tue Aug  7 18:52:59 2007
@@ -32,9 +32,10 @@
 
 class Gateway(object):
     _ThreadOut = ThreadOut 
-    _requestqueue = None
     remoteaddress = ""
-    def __init__(self, io, allowexec=False, _startcount=2): 
+    _requestqueue = None
+
+    def __init__(self, io, _startcount=2): 
         """ initialize core gateway, using the given 
             inputoutput object. 
         """
@@ -45,7 +46,9 @@
             atexit.register(cleanup_atexit)
             registered_cleanup = True
         _activegateways[self] = True
-        if allowexec:
+
+    def _initreceive(self, requestqueue=False):
+        if requestqueue: 
             self._requestqueue = Queue.Queue()
         self._receiverthread = threading.Thread(name="receiver", 
                                  target=self._thread_receiver)
@@ -70,9 +73,6 @@
         return "<%s%s %s/%s (%s active channels)>" %(
                 self.__class__.__name__, addr, r, s, i)
 
-##    def _local_trystopexec(self):
-##        self._execpool.shutdown() 
-
     def _trace(self, *args):
         if debug:
             try:
@@ -156,6 +156,7 @@
 
     def _servemain(self, joining=True):
         from sys import exc_info
+        self._initreceive(requestqueue=True)
         try:
             while 1:
                 item = self._requestqueue.get()
@@ -185,7 +186,9 @@
             while 1:
                 task = gw._requestqueue.get()
                 if task is None:
+                    gw._stopsend()
                     execpool.shutdown()
+                    execpool.join()
                     raise StopExecLoop
                 execpool.dispatch(gw._executetask, task)
         """ % num)

Modified: py/branch/lessthread/py/execnet/register.py
==============================================================================
--- py/branch/lessthread/py/execnet/register.py	(original)
+++ py/branch/lessthread/py/execnet/register.py	Tue Aug  7 18:52:59 2007
@@ -28,6 +28,8 @@
     def __init__(self, io):
         self._remote_bootstrap_gateway(io)
         super(InstallableGateway, self).__init__(io=io, _startcount=1) 
+        # XXX we dissallow execution form the other side
+        self._initreceive(requestqueue=False) 
 
     def _remote_bootstrap_gateway(self, io, extra=''):
         """ return Gateway with a asynchronously remotely
@@ -40,7 +42,7 @@
         bootstrap = [extra]
         bootstrap += [getsource(x) for x in startup_modules]
         bootstrap += [io.server_stmt, 
-                      "Gateway(io=io, allowexec=True, _startcount=2)._servemain()", 
+                      "Gateway(io=io, _startcount=2)._servemain()", 
                      ]
         source = "\n".join(bootstrap)
         self._trace("sending gateway bootstrap code")



More information about the pytest-commit mailing list