[py-svn] r39992 - py/branch/lessthread/py/execnet

hpk at codespeak.net hpk at codespeak.net
Tue Mar 6 18:35:03 CET 2007


Author: hpk
Date: Tue Mar  6 18:35:02 2007
New Revision: 39992

Modified:
   py/branch/lessthread/py/execnet/gateway.py
   py/branch/lessthread/py/execnet/register.py
Log:
removing execution threads by default


Modified: py/branch/lessthread/py/execnet/gateway.py
==============================================================================
--- py/branch/lessthread/py/execnet/gateway.py	(original)
+++ py/branch/lessthread/py/execnet/gateway.py	Tue Mar  6 18:35:02 2007
@@ -22,7 +22,7 @@
     from py.__.execnet.channel import ChannelFactory, Channel
     from py.__.execnet.message import Message
     ThreadOut = py._thread.ThreadOut 
-    WorkerPool = py._thread.WorkerPool 
+    WorkerPool = py._thread.WorkerPool
 
 import os
 debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid()  , 'wa')
@@ -31,20 +31,29 @@
 
 class Gateway(object):
     _ThreadOut = ThreadOut 
+    _workerpool = None
     remoteaddress = ""
     def __init__(self, io, execthreads=None, _startcount=2): 
         """ initialize core gateway, using the given 
             inputoutput object and 'execthreads' execution
-            threads. 
+            threads.  if 'execthreads' is -1, no execution
+            will be allowed on this side.  If it is None,
+            requests will be queued and _servemain() may
+            be used to execute them. 
         """
         global registered_cleanup, _activegateways
-        self._execpool = WorkerPool(maxthreads=execthreads)
         self._io = io
         self._channelfactory = ChannelFactory(self, _startcount)
         if not registered_cleanup:
             atexit.register(cleanup_atexit)
             registered_cleanup = True
         _activegateways[self] = True
+        if execthreads != -1:
+            if execthreads is not None: 
+                self._workerpool = WorkerPool(maxthreads=execthreads)
+            else:
+                self._requestqueue = Queue.Queue()
+            
         self._receiverthread = threading.Thread(name="receiver", 
                                  target=self._thread_receiver)
         self._receiverthread.setDaemon(0)
@@ -110,7 +119,8 @@
                     self._traceex(exc_info())
                     break 
         finally:
-            self._send(None)
+            self._stopexec()
+            self._stopsend()
             self._channelfactory._finished_receiving()
             self._trace('leaving %r' % threading.currentThread())
 
@@ -143,10 +153,31 @@
                 channel.close() 
         return close 
 
-    def _thread_executor(self, channel, (source, outid, errid)):
-        """ worker thread to execute source objects from the execution queue. """
+    def _local_schedulexec(self, channel, sourcetask):
+        if self._workerpool:
+            self._workerpool.dispatch(self._executetask, channel, sourcetask)
+        elif hasattr(self, '_requestqueue'): 
+            self._requestqueue.put((channel, sourcetask)) 
+
+    def _servemain(self, joining=True):
         from sys import exc_info
         try:
+            while 1:
+                item = self._requestqueue.get()
+                if item is None:
+                    self._stopsend()
+                    break
+                self._executetask(item) # could be done in an exec thread 
+        finally:
+            self._trace("_servemain finished") 
+        if self.joining:
+            self.join()
+
+    def _executetask(self, item):
+        """ execute channel/source items. """
+        from sys import exc_info
+        channel, (source, outid, errid) = item 
+        try:
             loc = { 'channel' : channel }
             self._trace("execution starts:", repr(source)[:50])
             close = self._local_redirect_thread_output(outid, errid) 
@@ -168,10 +199,6 @@
         else:
             channel.close()
 
-    def _local_schedulexec(self, channel, sourcetask): 
-        self._trace("dispatching exec")
-        self._execpool.dispatch(self._thread_executor, channel, sourcetask) 
-
     def _newredirectchannelid(self, callback): 
         if callback is None: 
             return  
@@ -245,8 +272,18 @@
         return Handle()
 
     def exit(self):
-        """ Try to stop all IO activity. """
-        self._send(None) 
+        """ Try to stop all exec and IO activity. """
+        self._stopexec()
+        self._stopsend()
+
+    def _stopsend(self):
+        self._send(None)
+
+    def _stopexec(self):
+        if hasattr(self, '_requestqueue'):
+            self._requestqueue.put(None)
+        if self._workerpool:
+            self._workerpool.shutdown()
 
     def join(self, joinexec=True):
         """ Wait for all IO (and by default all execution activity) 
@@ -257,8 +294,9 @@
             self._trace("joining receiver thread")
             self._receiverthread.join()
         if joinexec: 
-            self._execpool.join()
-            self._trace("joining execution threads finished, current %r" % current) 
+            if self._workerpool:
+                self._workerpool.join()
+                self._trace("joining execution threads finished, current %r" % current) 
 
 def getid(gw, cache={}):
     name = gw.__class__.__name__

Modified: py/branch/lessthread/py/execnet/register.py
==============================================================================
--- py/branch/lessthread/py/execnet/register.py	(original)
+++ py/branch/lessthread/py/execnet/register.py	Tue Mar  6 18:35:02 2007
@@ -41,7 +41,7 @@
         bootstrap = [extra]
         bootstrap += [getsource(x) for x in startup_modules]
         bootstrap += [io.server_stmt, 
-                      "Gateway(io=io, _startcount=2).join(joinexec=False)",
+                      "Gateway(io=io, _startcount=2)._servemain()", 
                      ]
         source = "\n".join(bootstrap)
         self._trace("sending gateway bootstrap code")



More information about the pytest-commit mailing list