[py-svn] r17658 - in py/branch/monday/py/execnet: . testing

hpk at codespeak.net hpk at codespeak.net
Mon Sep 19 15:50:58 CEST 2005


Author: hpk
Date: Mon Sep 19 15:50:57 2005
New Revision: 17658

Modified:
   py/branch/monday/py/execnet/channel.py
   py/branch/monday/py/execnet/gateway.py
   py/branch/monday/py/execnet/testing/test_gateway.py
Log:
(arigo,hpk,jan)

- removed channel argument from gateway.remote_exec 
- removed receiver arg from newchannel()
- introduced new channel.setcallback(callback) 
  that sends all received items to the callback 



Modified: py/branch/monday/py/execnet/channel.py
==============================================================================
--- py/branch/monday/py/execnet/channel.py	(original)
+++ py/branch/monday/py/execnet/channel.py	Mon Sep 19 15:50:57 2005
@@ -24,18 +24,44 @@
     """Communication channel between two possibly remote threads of code. """
     RemoteError = RemoteError
 
-    def __init__(self, gateway, id, has_callback=False):
+    def __init__(self, gateway, id):
         assert isinstance(id, int)
         self.gateway = gateway
         self.id = id
-        if has_callback:
-            self._items = None
-        else:
-            self._items = Queue.Queue()
+        self._items = Queue.Queue()
         self._closed = False
         self._receiveclosed = threading.Event()
         self._remoteerrors = []
 
+    def setcallback(self, callback):
+        queue = self._items
+        lock = self.gateway.channelfactory._receivelock
+        lock.acquire()
+        try:
+            _callbacks = self.gateway.channelfactory._callbacks
+            if _callbacks.setdefault(self.id, callback) is not callback:
+                raise IOError("%r has callback already registered" %(self,))
+            self._items = None
+            while 1:
+                try:
+                    olditem = queue.get(block=False)
+                except Queue.Empty:
+                    break
+                else:
+                    if olditem is ENDMARKER:
+                        queue.put(olditem)
+                        break
+                    else:
+                        callback(olditem)
+            if self._closed or self._receiveclosed.isSet():
+                # no need to keep a callback
+                try:
+                    del _callbacks[self.id]
+                except KeyError:
+                    pass
+        finally:
+            lock.release()
+         
     def __repr__(self):
         flag = self.isclosed() and "closed" or "open"
         return "<Channel id=%d %s>" % (self.id, flag)
@@ -100,8 +126,9 @@
                 self._remoteerrors.append(error)
             self._closed = True         # --> "closed"
             self._receiveclosed.set()
-            if self._items is not None:
-                self._items.put(ENDMARKER)
+            queue = self._items
+            if queue is not None:
+                queue.put(ENDMARKER)
             self.gateway.channelfactory._no_longer_opened(self.id)
 
     def waitclose(self, timeout=None):
@@ -139,11 +166,12 @@
         reraised as channel.RemoteError exceptions containing
         a textual representation of the remote traceback.
         """
-        if self._items is None:
+        queue = self._items
+        if queue is None:
             raise IOError("calling receive() on channel with receiver callback")
-        x = self._items.get()
+        x = queue.get()
         if x is ENDMARKER: 
-            self._items.put(x)  # for other receivers 
+            queue.put(x)  # for other receivers 
             raise self._getremoteerror() or EOFError()
         else: 
             return x
@@ -170,20 +198,18 @@
         self._channels = weakref.WeakValueDictionary()
         self._callbacks = {}
         self._writelock = threading.Lock()
+        self._receivelock = threading.RLock()
         self.gateway = gateway
         self.count = startcount
 
-    def new(self, id=None, receiver=None):
+    def new(self, id=None):
         """ create a new Channel with 'id' (or create new id if None). """
         self._writelock.acquire()
         try:
             if id is None:
                 id = self.count
                 self.count += 2
-            has_callback = receiver is not None
-            if has_callback:
-                self._callbacks[id] = receiver
-            channel = Channel(self.gateway, id, has_callback)
+            channel = Channel(self.gateway, id)
             self._channels[id] = channel
             return channel
         finally:
@@ -217,8 +243,9 @@
                 channel._remoteerrors.append(remoteerror)
             channel._closed = True          # --> "closed"
             channel._receiveclosed.set()
-            if channel._items is not None:
-                channel._items.put(ENDMARKER)
+            queue = channel._items
+            if queue is not None:
+                queue.put(ENDMARKER)
         self._no_longer_opened(id)
 
     def _local_last_message(self, id):
@@ -229,21 +256,27 @@
         else:
             # state transition: if "opened", change to "sendonly"
             channel._receiveclosed.set()
-            if channel._items is not None:
-                channel._items.put(ENDMARKER)
+            queue = channel._items
+            if queue is not None:
+                queue.put(ENDMARKER)
         self._no_longer_opened(id)
 
     def _local_receive(self, id, data): 
         # executes in receiver thread
-        callback = self._callbacks.get(id)
-        if callback is not None:
-            callback(data)   # even if channel may be already closed
-        else:
-            channel = self._channels.get(id)
-            if channel is None or channel._items is None:
-                pass    # drop data
+        self._receivelock.acquire()
+        try:
+            callback = self._callbacks.get(id)
+            if callback is not None:
+                callback(data)   # even if channel may be already closed
             else:
-                channel._items.put(data)
+                channel = self._channels.get(id)
+                queue = channel and channel._items
+                if queue is None:
+                    pass    # drop data
+                else:
+                    queue.put(data)
+        finally:
+            self._receivelock.release()
 
     def _finished_receiving(self):
         for id in self._channels.keys():

Modified: py/branch/monday/py/execnet/gateway.py
==============================================================================
--- py/branch/monday/py/execnet/gateway.py	(original)
+++ py/branch/monday/py/execnet/gateway.py	Mon Sep 19 15:50:57 2005
@@ -137,7 +137,7 @@
                 channel.close() 
         return close 
 
-    def thread_executor(self, channel, (source, outid, errid, autoclose)):
+    def thread_executor(self, channel, (source, outid, errid)):
         """ worker thread to execute source objects from the execution queue. """
         from sys import exc_info
         try:
@@ -159,15 +159,7 @@
             channel.close(errortext)
             self.trace(errortext)
         else:
-            if autoclose:
-                channel.close()
-            else:
-                # the channel should usually be closed by Channel.__del__.
-                # Give it a better chance now.
-                try:
-                    del loc['channel']
-                except KeyError:
-                    pass
+            channel.close()
 
     def _local_schedulexec(self, channel, sourcetask): 
         self.trace("dispatching exec")
@@ -179,7 +171,8 @@
         if hasattr(callback, 'write'): 
             callback = callback.write 
         assert callable(callback) 
-        chan = self.newchannel(receiver=callback) 
+        chan = self.newchannel()
+        chan.setcallback(callback)
         return chan.id 
 
     # _____________________________________________________________________
@@ -187,18 +180,14 @@
     # High Level Interface
     # _____________________________________________________________________
     #
-    def newchannel(self, receiver=None): 
-        """ return new channel object. If a 'receiver' callback is provided 
-            it will be invoked on each received item. You cannot call 
-            receive() anymore on such a channel. 
-        """ 
-        return self.channelfactory.new(receiver=receiver) 
+    def newchannel(self): 
+        """ return new channel object.  """ 
+        return self.channelfactory.new()
 
-    def remote_exec(self, source, stdout=None, stderr=None, channel=None): 
+    def remote_exec(self, source, stdout=None, stderr=None): 
         """ return channel object for communicating with the asynchronously
             executing 'source' code which will have a corresponding 'channel'
-            object in its executing namespace. If a channel object is not
-            provided a new channel will be created.  
+            object in its executing namespace. 
         """
         try:
             source = str(Source(source))
@@ -208,26 +197,23 @@
                 source = str(py.code.Source(source))
             except ImportError: 
                 pass 
-        if channel is None: 
-            channel = self.newchannel() 
-            autoclose = True
-        else:
-            autoclose = False
+        channel = self.newchannel() 
         outid = self._newredirectchannelid(stdout) 
         errid = self._newredirectchannelid(stderr) 
         self._outgoing.put(Message.CHANNEL_OPEN(channel.id, 
-                               (source, outid, errid, autoclose)))
+                               (source, outid, errid)))
         return channel 
 
     def remote_redirect(self, stdout=None, stderr=None): 
-        """ return a handle representing a redirection of of remote 
+        """ return a handle representing a redirection of a remote 
             end's stdout to a local file object.  with handle.close() 
             the redirection will be reverted.   
         """ 
         clist = []
         for name, out in ('stdout', stdout), ('stderr', stderr): 
             if out: 
-                outchannel = self.newchannel(receiver=getattr(out, 'write', out))
+                outchannel = self.newchannel()
+                outchannel.setcallback(getattr(out, 'write', out))
                 channel = self.remote_exec(""" 
                     import sys
                     outchannel = channel.receive() 

Modified: py/branch/monday/py/execnet/testing/test_gateway.py
==============================================================================
--- py/branch/monday/py/execnet/testing/test_gateway.py	(original)
+++ py/branch/monday/py/execnet/testing/test_gateway.py	Mon Sep 19 15:50:57 2005
@@ -162,41 +162,76 @@
 
     def test_channel_receiver_callback(self): 
         l = []
-        channel = self.gw.newchannel(receiver=l.append)
-        self.gw.remote_exec(channel=channel, source='''
+        #channel = self.gw.newchannel(receiver=l.append)
+        channel = self.gw.remote_exec(source='''
             channel.send(42)
             channel.send(13)
             channel.send(channel.gateway.newchannel())
             ''') 
+        channel.setcallback(callback=l.append)
+        py.test.raises(IOError, channel.receive)
         channel.waitclose(1.0) 
         assert len(l) == 3
         assert l[:2] == [42,13]
         assert isinstance(l[2], channel.__class__) 
 
+    def test_channel_callback_after_receive(self):
+        l = []
+        channel = self.gw.remote_exec(source='''
+            channel.send(42)
+            channel.send(13)
+            channel.send(channel.gateway.newchannel())
+            ''') 
+        x = channel.receive()
+        assert x == 42
+        channel.setcallback(callback=l.append)
+        py.test.raises(IOError, channel.receive)
+        channel.waitclose(1.0) 
+        assert len(l) == 2
+        assert l[0] == 13
+        assert isinstance(l[1], channel.__class__) 
+
+    def test_waiting_for_callbacks(self):
+        l = []
+        def callback(msg):
+            import time; time.sleep(0.2)
+            l.append(msg)
+        channel = self.gw.remote_exec(source='''
+            channel.send(42)
+            ''')
+        channel.setcallback(callback)
+        channel.waitclose(1.0)
+        assert l == [42]
+
     def test_channel_callback_stays_active(self, earlyfree=True):
         # with 'earlyfree==True', this tests the "sendonly" channel state.
         l = []
-        channel = self.gw.newchannel(receiver=l.append)
-        self.gw.remote_exec(channel=channel, source='''
+        channel = self.gw.remote_exec(source='''
             import thread, time
-            def producer(channel):
+            def producer(subchannel):
                 for i in range(5):
                     time.sleep(0.15)
-                    channel.send(i*100)
-            thread.start_new_thread(producer, (channel,))
+                    subchannel.send(i*100)
+            channel2 = channel.receive()
+            thread.start_new_thread(producer, (channel2,))
+            del channel2
             ''')
+        subchannel = self.gw.newchannel()
+        subchannel.setcallback(l.append)
+        channel.send(subchannel)
         if earlyfree:
-            channel = None
+            subchannel = None
         counter = 100
         while len(l) < 5:
-            if channel and channel.isclosed():
+            if subchannel and subchannel.isclosed():
                 break
             counter -= 1
+            print counter
             if not counter:
-                py.test.fail("timed out waiting for the answer[%d]" % i)
+                py.test.fail("timed out waiting for the answer[%d]" % len(l))
             time.sleep(0.04)   # busy-wait
         assert l == [0, 100, 200, 300, 400]
-        return channel
+        return subchannel
 
     def test_channel_callback_remote_freed(self):
         channel = self.test_channel_callback_stays_active(False)



More information about the pytest-commit mailing list