[py-svn] r7108 - py/dist/py/execnet

hpk at codespeak.net hpk at codespeak.net
Sat Oct 23 23:06:22 CEST 2004


Author: hpk
Date: Sat Oct 23 23:06:22 2004
New Revision: 7108

Modified:
   py/dist/py/execnet/gateway.py
   py/dist/py/execnet/test_gateway.py
Log:
new feature: allow passing channels over channels (experimental) 

this allows two program fragments to open multiple channels
to each other.
 


Modified: py/dist/py/execnet/gateway.py
==============================================================================
--- py/dist/py/execnet/gateway.py	(original)
+++ py/dist/py/execnet/gateway.py	Sat Oct 23 23:06:22 2004
@@ -162,6 +162,11 @@
         self._outgoing.put(Message.CHANNEL_OPEN(channel.id, source))
         return channel
 
+    def newchannel(self):
+        """ return a new independent channel. """
+        return self.channelfactory.new()
+
+
 class Channel(object):
     """Communication channel between two possibly remote threads of code. """
     def __init__(self, gateway, id):
@@ -201,8 +206,13 @@
         possibly blocking if the sender queue is full. 
         Note that an item needs to be marshallable. 
         """
-        s = py.std.marshal.dumps(item) 
-        self.gateway._outgoing.put(Message.CHANNEL_DATA(self.id, s)) 
+        if isinstance(item, Channel):
+            s = py.std.marshal.dumps(item.id) 
+            data = Message.CHANNEL_NEW(self.id, s) 
+        else: 
+            s = py.std.marshal.dumps(item) 
+            data = Message.CHANNEL_DATA(self.id, s) 
+        self.gateway._outgoing.put(data)
 
     def receive(self):
         """receives an item that was sent from the other side, 
@@ -337,11 +347,21 @@
             channel = Channel(gateway, self.channelid) 
             gateway.channelfactory[self.channelid] = channel 
             gateway._execqueue.put((channel, self.data)) 
+
+    class CHANNEL_NEW(Message):
+        def received(self, gateway):
+            newid = py.std.marshal.loads(self.data)
+            newchannel = Channel(gateway, newid) 
+            gateway.channelfactory[newid] = newchannel 
+            channel = gateway.channelfactory[self.channelid] 
+            channel._items.put(newchannel) 
+
     class CHANNEL_DATA(Message):
         def received(self, gateway):
             channel = gateway.channelfactory[self.channelid]
             x = py.std.marshal.loads(self.data)
             channel._items.put(x) 
+
     class CHANNEL_CLOSE(Message):
         def received(self, gateway):
             channel = gateway.channelfactory[self.channelid]

Modified: py/dist/py/execnet/test_gateway.py
==============================================================================
--- py/dist/py/execnet/test_gateway.py	(original)
+++ py/dist/py/execnet/test_gateway.py	Sat Oct 23 23:06:22 2004
@@ -96,6 +96,16 @@
         assert x == 42 
         py.test.raises(gateway.RemoteError, channel.receive) 
 
+    def test_channel_passing_over_channel(self):
+        channel = self.gw.remote_exec('''
+                    c = channel.gateway.newchannel()
+                    channel.send(c) 
+                    c.send(42)
+                  ''')
+        c = channel.receive()
+        x = c.receive()
+        assert x == 42 
+
 class TestBasicPopenGateway(PopenGatewayTestSetup, BasicRemoteExecution): 
     def test_many_popen(self):
         num = 4



More information about the pytest-commit mailing list