[py-svn] r10860 - in py/dist/py/execnet: . testing

hpk at codespeak.net hpk at codespeak.net
Tue Apr 19 17:14:03 CEST 2005


Author: hpk
Date: Tue Apr 19 17:14:03 2005
New Revision: 10860

Modified:
   py/dist/py/execnet/channel.py
   py/dist/py/execnet/gateway.py
   py/dist/py/execnet/message.py
   py/dist/py/execnet/register.py
   py/dist/py/execnet/testing/test_gateway.py
Log:
rename internal methods to make code more readable. 



Modified: py/dist/py/execnet/channel.py
==============================================================================
--- py/dist/py/execnet/channel.py	(original)
+++ py/dist/py/execnet/channel.py	Tue Apr 19 17:14:03 2005
@@ -42,7 +42,7 @@
     #
     # internal methods, called from the receiver thread 
     #
-    def _do_close(self, stickyerror=EOFError()):
+    def _local_close(self, stickyerror=EOFError()):
         if self.id in self.gateway.channelfactory:
             del self.gateway.channelfactory[self.id]
         self._stickyerror = stickyerror
@@ -52,21 +52,21 @@
         self._closeevent.set()
 
 
-    def _do_receivechannel(self, newid):
+    def _local_receivechannel(self, newid):
         """ receive a remotely created new (sub)channel. """
         newchannel = Channel(self.gateway, newid)
         self.gateway.channelfactory[newid] = newchannel
         #self._depchannel.append(newchannel)
         self._items.put(newchannel)
 
-    def _do_receivedata(self, data): 
+    def _local_receivedata(self, data): 
         if self._callback is not None: 
-            self.gateway._dispatchcallback(self._callback, data) 
+            self.gateway._local_dispatchcallback(self._callback, data) 
         else: 
             self._items.put(data) 
 
-    def _do_scheduleexec(self, sourcetask): 
-        self.gateway._scheduleexec(channel=self, sourcetask=sourcetask) 
+    def _local_schedulexec(self, sourcetask): 
+        self.gateway._local_schedulexec(channel=self, sourcetask=sourcetask) 
 
     #
     # public API for channel objects 
@@ -94,10 +94,7 @@
                 put(Message.CHANNEL_CLOSE_ERROR(self.id, str(error)))
             else:
                 put(Message.CHANNEL_CLOSE(self.id))
-            self._do_close()
-
-    def remote_exec(self, source, stdout=None, stderr=None): 
-        self.gateway._remote_exec(self, source, stdout, stderr) 
+            self._local_close()
 
     def newchannel(self):
         """ return a new independent channel."""
@@ -164,7 +161,7 @@
 class ChannelFactory(object):
     def __init__(self, gateway, startcount=1):
         self._dict = dict()
-        self._lock = threading.RLock()
+        self._lock = threading.Lock()
         self.gateway = gateway
         self.count = startcount
 

Modified: py/dist/py/execnet/gateway.py
==============================================================================
--- py/dist/py/execnet/gateway.py	(original)
+++ py/dist/py/execnet/gateway.py	Tue Apr 19 17:14:03 2005
@@ -45,34 +45,45 @@
         return "<%s %s/%s (%d active channels)>" %(self.__class__.__name__, 
                                                    R, S, i) 
 
-    def _stopexec(self):
-        #self.pool.prunestopped()
-        self._execpool.shutdown() 
+    def _local_trystopexec(self):
+        try: 
+            self._execpool.shutdown() 
+        except IOError: 
+            return False 
+        return True
 
     def exit(self):
+        """ initiate full gateway teardown.   
+            Note that the  teardown of sender/receiver threads happens 
+            asynchronously and timeouts on stopping worker execution 
+            threads are ignored.  You can issue join() or join(joinexec=False) 
+            if you want to wait for a full teardown (possibly excluding 
+            execution threads). 
+        """ 
         # note that threads may still be scheduled to start
         # during our execution! 
         self._exitlock.acquire()
         try:
             if self.running: 
                 self.running = False 
-                self._stopexec()
-                if self.pool.getstarted('sender'): 
-                    self._outgoing.put(Message.EXIT_GATEWAY())
-                self.trace("exit procedure triggered, pid %d, gateway %r" % (
-                           os.getpid(), self))
+                if not self.pool.getstarted('sender'): 
+                    raise IOError("sender thread not alive anymore!") 
+                self._outgoing.put(Message.EXIT_GATEWAY())
+                self.trace("exit procedure triggered, pid %d " % (os.getpid(),))
                 _gateways.remove(self) 
         finally:
             self._exitlock.release()
 
-    def join(self):
+    def join(self, joinexec=True):
         current = threading.currentThread()
         for x in self.pool.getstarted(): 
             if x != current: 
                 self.trace("joining %s" % x)
                 x.join()
-        self._execpool.join()
-        self.trace("joining threads finished, current %r" % current) 
+        self.trace("joining sender/reciver threads finished, current %r" % current) 
+        if joinexec: 
+            self._execpool.join()
+            self.trace("joining execution threads finished, current %r" % current) 
 
     def trace(self, *args):
         if debug:
@@ -125,11 +136,11 @@
         finally:
             self.trace('leaving %r' % threading.currentThread())
 
-    def _redirect_thread_output(self, outid, errid): 
+    def _local_redirect_thread_output(self, outid, errid): 
         l = []
         for name, id in ('stdout', outid), ('stderr', errid): 
             if id: 
-                channel = self._makechannel(outid) 
+                channel = self._local_makechannelobject(outid) 
                 out = ThreadOut(sys, name)
                 out.setwritefunc(channel.send) 
                 l.append((out, channel))
@@ -139,7 +150,7 @@
                 channel.close() 
         return close 
 
-    def _makechannel(self, newid): 
+    def _local_makechannelobject(self, newid): 
         newchannel = Channel(self, newid) 
         self.channelfactory[newid] = newchannel
         return newchannel 
@@ -149,7 +160,7 @@
         try:
             loc = { 'channel' : channel }
             self.trace("execution starts:", repr(source)[:50])
-            close = self._redirect_thread_output(outid, errid) 
+            close = self._local_redirect_thread_output(outid, errid) 
             try:
                 co = compile(source+'\n', '', 'exec', 4096)
                 exec co in loc
@@ -167,31 +178,17 @@
         else:
             channel.close()
 
-    def _scheduleexec(self, channel, sourcetask): 
+    def _local_schedulexec(self, channel, sourcetask): 
         self.trace("dispatching exec")
         self._execpool.dispatch(self.thread_executor, channel, sourcetask) 
 
-    def _dispatchcallback(self, callback, data): 
+    def _local_dispatchcallback(self, callback, data): 
         # XXX this should run in a separate thread because
         #     we might otherwise block the receiver thread 
         #     where we get called from 
         callback(data) 
 
-    def _remote_exec(self, channel, source, stdout=None, stderr=None): 
-        try:
-            source = str(Source(source))
-        except NameError: 
-            try: 
-                import py 
-                source = str(py.code.Source(source))
-            except ImportError: 
-                pass 
-        outid = self._redirectchannelid(stdout) 
-        errid = self._redirectchannelid(stderr) 
-        self._outgoing.put(Message.CHANNEL_OPEN(channel.id, 
-                                                (source, outid, errid))) 
-
-    def _redirectchannelid(self, callback): 
+    def _newredirectchannelid(self, callback): 
         if callback is None: 
             return  
         if hasattr(callback, 'write'): 
@@ -210,15 +207,26 @@
         """ return new channel object. """ 
         return self.channelfactory.new() 
 
-    def remote_exec(self, source, stdout=None, stderr=None): 
+    def remote_exec(self, source, stdout=None, stderr=None, channel=None): 
         """ return channel object for communicating with the asynchronously
             executing 'source' code which will have a corresponding 'channel'
             object in its executing namespace. If a channel object is not
-            provided a new channel will be created. If a channel is provided
-            is will be returned as well. 
+            provided a new channel will be created.  
         """
-        channel = self.newchannel() 
-        channel.remote_exec(source, stdout=stdout, stderr=stderr) 
+        try:
+            source = str(Source(source))
+        except NameError: 
+            try: 
+                import py 
+                source = str(py.code.Source(source))
+            except ImportError: 
+                pass 
+        if channel is None: 
+            channel = self.newchannel() 
+        outid = self._newredirectchannelid(stdout) 
+        errid = self._newredirectchannelid(stderr) 
+        self._outgoing.put(Message.CHANNEL_OPEN(channel.id, 
+                                                (source, outid, errid))) 
         return channel 
 
     def remote_redirect(self, stdout=None, stderr=None): 

Modified: py/dist/py/execnet/message.py
==============================================================================
--- py/dist/py/execnet/message.py	(original)
+++ py/dist/py/execnet/message.py	Tue Apr 19 17:14:03 2005
@@ -63,14 +63,14 @@
     class EXIT_GATEWAY(Message):
         def received(self, gateway):
             # executes in receiver thread 
-            gateway._stopexec()
             for x in gateway.channelfactory.values():
-                x._do_close()
+                x._local_close()
             gateway._outgoing.put(self.STOP_RECEIVING())
+            gateway._local_trystopexec()
             raise SystemExit
         def post_sent(self, gateway, excinfo=None):
-            gateway._stopexec()
             # executes in sender thread 
+            gateway._local_trystopexec()
             gateway.io.close_write()
             raise SystemExit
 
@@ -81,7 +81,7 @@
             # already. With sockets closing it would raise
             # a Transport Not Connected exception
             for x in gateway.channelfactory.values():
-                x._do_close()
+                x._local_close()
             raise SystemExit
         def post_sent(self, gateway, excinfo=None):
             # after we sent STOP_RECEIVING we don't
@@ -92,28 +92,28 @@
     class CHANNEL_OPEN(Message):
         def received(self, gateway):
             channel = gateway.channelfactory.new(self.channelid)
-            channel._do_scheduleexec(self.data) 
+            channel._local_schedulexec(self.data) 
 
     class CHANNEL_NEW(Message):
         def received(self, gateway):
             newid = self.data
             channel = gateway.channelfactory[self.channelid]
-            channel._do_receivechannel(newid)
+            channel._local_receivechannel(newid)
 
     class CHANNEL_DATA(Message):
         def received(self, gateway):
             channel = gateway.channelfactory[self.channelid]
-            channel._do_receivedata(self.data) 
+            channel._local_receivedata(self.data) 
 
     class CHANNEL_CLOSE(Message):
         def received(self, gateway):
             channel = gateway.channelfactory[self.channelid]
-            channel._do_close()
+            channel._local_close()
 
     class CHANNEL_CLOSE_ERROR(Message):
         def received(self, gateway):
             channel = gateway.channelfactory[self.channelid]
-            channel._do_close(channel.RemoteError(self.data))
+            channel._local_close(channel.RemoteError(self.data))
 
     classes = [x for x in locals().values() if hasattr(x, '__bases__')]
     classes.sort(lambda x,y : cmp(x.__name__, y.__name__))

Modified: py/dist/py/execnet/register.py
==============================================================================
--- py/dist/py/execnet/register.py	(original)
+++ py/dist/py/execnet/register.py	Tue Apr 19 17:14:03 2005
@@ -42,7 +42,7 @@
         """
         bootstrap = ["we_are_remote=True", extra]
         bootstrap += [getsource(x) for x in startup_modules]
-        bootstrap += [io.server_stmt, "Gateway(io=io, startcount=2).join()",]
+        bootstrap += [io.server_stmt, "Gateway(io=io, startcount=2).join(joinexec=False)",]
         source = "\n".join(bootstrap)
         self.trace("sending gateway bootstrap code")
         io.write('%r\n' % source)

Modified: py/dist/py/execnet/testing/test_gateway.py
==============================================================================
--- py/dist/py/execnet/testing/test_gateway.py	(original)
+++ py/dist/py/execnet/testing/test_gateway.py	Tue Apr 19 17:14:03 2005
@@ -125,14 +125,14 @@
         assert x == 42
         py.test.raises(channel.RemoteError, channel.receive)
 
-    def test_channel__do_close(self):
+    def test_channel__local_close(self):
         channel = self.gw.channelfactory.new()
-        channel._do_close()
+        channel._local_close()
         channel.waitclose(0.1)
 
-    def test_channel__do_close_error(self):
+    def test_channel__local_close_error(self):
         channel = self.gw.channelfactory.new()
-        channel._do_close(channel.RemoteError("error"))
+        channel._local_close(channel.RemoteError("error"))
         py.test.raises(channel.RemoteError, channel.waitclose, 0.01)
 
     def test_channel_iter(self):
@@ -166,7 +166,7 @@
         channel = self.gw.newchannel()
         l = []
         channel.setcallback(l.append) 
-        channel.remote_exec('''
+        self.gw.remote_exec(channel=channel, source='''
             channel.send(42)
             channel.send(13)
             ''') 
@@ -243,6 +243,22 @@
         assert first.strip() == 'hello world' 
         py.test.raises(EOFError, channel.receive) 
 
+#class TestBlockingIssues: 
+#    def test_join_blocked_execution_gateway(self): 
+#        gateway = py.execnet.PopenGateway() 
+#        channel = gateway.remote_exec("""
+#            time.sleep(5.0)
+#        """)
+#        def doit(): 
+#            gateway.exit() 
+#            gateway.join(joinexec=True) 
+#            return 17 
+#
+#        pool = py._thread.WorkerPool() 
+#        reply = pool.dispatch(doit) 
+#        x = reply.get(timeout=1.0) 
+#        assert x == 17 
+
 class TestBasicPopenGateway(PopenGatewayTestSetup, BasicRemoteExecution):
     #disabled = True
     def test_many_popen(self):



More information about the pytest-commit mailing list