[py-svn] r63136 - py/trunk/py/execnet

hpk at codespeak.net hpk at codespeak.net
Fri Mar 20 14:31:03 CET 2009


Author: hpk
Date: Fri Mar 20 14:31:02 2009
New Revision: 63136

Modified:
   py/trunk/py/execnet/gateway.py
Log:
better grouping of gateway public API 


Modified: py/trunk/py/execnet/gateway.py
==============================================================================
--- py/trunk/py/execnet/gateway.py	(original)
+++ py/trunk/py/execnet/gateway.py	Fri Mar 20 14:31:02 2009
@@ -202,28 +202,6 @@
         if joining:
             self.join()
 
-    def remote_init_threads(self, num=None):
-        """ start up to 'num' threads for subsequent 
-            remote_exec() invocations to allow concurrent
-            execution. 
-        """
-        if hasattr(self, '_remotechannelthread'):
-            raise IOError("remote threads already running")
-        from py.__.thread import pool
-        source = py.code.Source(pool, """
-            execpool = WorkerPool(maxthreads=%r)
-            gw = channel.gateway
-            while 1:
-                task = gw._requestqueue.get()
-                if task is None:
-                    gw._stopsend()
-                    execpool.shutdown()
-                    execpool.join()
-                    raise gw._StopExecLoop
-                execpool.dispatch(gw._executetask, task)
-        """ % num)
-        self._remotechannelthread = self.remote_exec(source)
-
     def _executetask(self, item):
         """ execute channel/source items. """
         from sys import exc_info
@@ -268,10 +246,6 @@
     # High Level Interface
     # _____________________________________________________________________
     #
-    def newchannel(self): 
-        """ return new channel object.  """ 
-        return self._channelfactory.new()
-
     def remote_exec(self, source, stdout=None, stderr=None): 
         """ return channel object and connect it to a remote
             execution thread where the given 'source' executes
@@ -295,6 +269,53 @@
                     channel.id, (source, outid, errid)))
         return channel 
 
+    def remote_init_threads(self, num=None):
+        """ start up to 'num' threads for subsequent 
+            remote_exec() invocations to allow concurrent
+            execution. 
+        """
+        if hasattr(self, '_remotechannelthread'):
+            raise IOError("remote threads already running")
+        from py.__.thread import pool
+        source = py.code.Source(pool, """
+            execpool = WorkerPool(maxthreads=%r)
+            gw = channel.gateway
+            while 1:
+                task = gw._requestqueue.get()
+                if task is None:
+                    gw._stopsend()
+                    execpool.shutdown()
+                    execpool.join()
+                    raise gw._StopExecLoop
+                execpool.dispatch(gw._executetask, task)
+        """ % num)
+        self._remotechannelthread = self.remote_exec(source)
+
+    def newchannel(self): 
+        """ return new channel object.  """ 
+        return self._channelfactory.new()
+
+    def join(self, joinexec=True):
+        """ Wait for all IO (and by default all execution activity) 
+            to stop. the joinexec parameter is obsolete. 
+        """
+        current = threading.currentThread()
+        if self._receiverthread.isAlive():
+            self._trace("joining receiver thread")
+            self._receiverthread.join()
+
+    def exit(self):
+        """ Try to stop all exec and IO activity. """
+        self._cleanup.unregister(self)
+        self._stopexec()
+        self._stopsend()
+        try:
+            py._com.pyplugins.notify("gateway_exit", self)
+        except NameError: 
+            # XXX on the remote side 'py' is not imported 
+            # and so we can't notify 
+            pass
+
     def _remote_redirect(self, stdout=None, stderr=None): 
         """ return a handle representing a redirection of a remote 
             end's stdout to a local file object.  with handle.close() 
@@ -325,17 +346,6 @@
                         c.waitclose() 
         return Handle()
 
-    def exit(self):
-        """ Try to stop all exec and IO activity. """
-        self._cleanup.unregister(self)
-        self._stopexec()
-        self._stopsend()
-        try:
-            py._com.pyplugins.notify("gateway_exit", self)
-        except NameError: 
-            # on the remote side 'py' is not imported 
-            # and so we can't notify (XXX: make execnet synchronous) 
-            pass
 
     def _stopsend(self):
         self._send(None)
@@ -344,14 +354,6 @@
         if self._requestqueue is not None:
             self._requestqueue.put(None)
 
-    def join(self, joinexec=True):
-        """ Wait for all IO (and by default all execution activity) 
-            to stop. the joinexec parameter is obsolete. 
-        """
-        current = threading.currentThread()
-        if self._receiverthread.isAlive():
-            self._trace("joining receiver thread")
-            self._receiverthread.join()
 
 def getid(gw, cache={}):
     name = gw.__class__.__name__



More information about the pytest-commit mailing list