[py-svn] r7108 - py/dist/py/execnet
hpk at codespeak.net
hpk at codespeak.net
Sat Oct 23 23:06:22 CEST 2004
Author: hpk
Date: Sat Oct 23 23:06:22 2004
New Revision: 7108
Modified:
py/dist/py/execnet/gateway.py
py/dist/py/execnet/test_gateway.py
Log:
new feature: allow passing channels over channels (experimental)
this allows two program fragments to open multiple channels
to each other.
Modified: py/dist/py/execnet/gateway.py
==============================================================================
--- py/dist/py/execnet/gateway.py (original)
+++ py/dist/py/execnet/gateway.py Sat Oct 23 23:06:22 2004
@@ -162,6 +162,11 @@
self._outgoing.put(Message.CHANNEL_OPEN(channel.id, source))
return channel
+ def newchannel(self):
+ """ return a new independent channel. """
+ return self.channelfactory.new()
+
+
class Channel(object):
"""Communication channel between two possibly remote threads of code. """
def __init__(self, gateway, id):
@@ -201,8 +206,13 @@
possibly blocking if the sender queue is full.
Note that an item needs to be marshallable.
"""
- s = py.std.marshal.dumps(item)
- self.gateway._outgoing.put(Message.CHANNEL_DATA(self.id, s))
+ if isinstance(item, Channel):
+ s = py.std.marshal.dumps(item.id)
+ data = Message.CHANNEL_NEW(self.id, s)
+ else:
+ s = py.std.marshal.dumps(item)
+ data = Message.CHANNEL_DATA(self.id, s)
+ self.gateway._outgoing.put(data)
def receive(self):
"""receives an item that was sent from the other side,
@@ -337,11 +347,21 @@
channel = Channel(gateway, self.channelid)
gateway.channelfactory[self.channelid] = channel
gateway._execqueue.put((channel, self.data))
+
+ class CHANNEL_NEW(Message):
+ def received(self, gateway):
+ newid = py.std.marshal.loads(self.data)
+ newchannel = Channel(gateway, newid)
+ gateway.channelfactory[newid] = newchannel
+ channel = gateway.channelfactory[self.channelid]
+ channel._items.put(newchannel)
+
class CHANNEL_DATA(Message):
def received(self, gateway):
channel = gateway.channelfactory[self.channelid]
x = py.std.marshal.loads(self.data)
channel._items.put(x)
+
class CHANNEL_CLOSE(Message):
def received(self, gateway):
channel = gateway.channelfactory[self.channelid]
Modified: py/dist/py/execnet/test_gateway.py
==============================================================================
--- py/dist/py/execnet/test_gateway.py (original)
+++ py/dist/py/execnet/test_gateway.py Sat Oct 23 23:06:22 2004
@@ -96,6 +96,16 @@
assert x == 42
py.test.raises(gateway.RemoteError, channel.receive)
+ def test_channel_passing_over_channel(self):
+ channel = self.gw.remote_exec('''
+ c = channel.gateway.newchannel()
+ channel.send(c)
+ c.send(42)
+ ''')
+ c = channel.receive()
+ x = c.receive()
+ assert x == 42
+
class TestBasicPopenGateway(PopenGatewayTestSetup, BasicRemoteExecution):
def test_many_popen(self):
num = 4
More information about the pytest-commit
mailing list