[py-svn] r56908 - in py/branch/event/py/execnet: . testing
hpk at codespeak.net
hpk at codespeak.net
Sat Aug 2 08:49:03 CEST 2008
Author: hpk
Date: Sat Aug 2 08:49:01 2008
New Revision: 56908
Modified:
py/branch/event/py/execnet/channel.py
py/branch/event/py/execnet/gateway.py
py/branch/event/py/execnet/message.py
py/branch/event/py/execnet/testing/test_gateway.py
Log:
add experimental pickling support to execnet channels
Modified: py/branch/event/py/execnet/channel.py
==============================================================================
--- py/branch/event/py/execnet/channel.py (original)
+++ py/branch/event/py/execnet/channel.py Sat Aug 2 08:49:01 2008
@@ -1,5 +1,7 @@
import threading, weakref, sys
import Queue
+from cPickle import Pickler, Unpickler
+
if 'Message' not in globals():
from py.__.execnet.message import Message
@@ -25,8 +27,9 @@
class Channel(object):
"""Communication channel between two possibly remote threads of code. """
RemoteError = RemoteError
+ _picklememo = None # not None means that we are in pickling mode
- def __init__(self, gateway, id):
+ def __init__(self, gateway, id, pickle=False):
assert isinstance(id, int)
self.gateway = gateway
self.id = id
@@ -34,6 +37,8 @@
self._closed = False
self._receiveclosed = threading.Event()
self._remoteerrors = []
+ if pickle:
+ self._picklememo = {}
def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED):
queue = self._items
@@ -156,6 +161,13 @@
if isinstance(item, Channel):
data = Message.CHANNEL_NEW(self.id, item.id)
else:
+ if self._picklememo is not None:
+ from cStringIO import StringIO
+ f = StringIO()
+ pickler = Pickler(f, protocol=-1) # use best protocol
+ pickler.memo = self._picklememo
+ pickler.dump(item)
+ item = f.getvalue()
data = Message.CHANNEL_DATA(self.id, item)
self.gateway._send(data)
@@ -203,7 +215,7 @@
self.count = startcount
self.finished = False
- def new(self, id=None):
+ def new(self, id=None, pickle=False):
""" create a new Channel with 'id' (or create new id if None). """
self._writelock.acquire()
try:
@@ -212,7 +224,7 @@
if id is None:
id = self.count
self.count += 2
- channel = Channel(self.gateway, id)
+ channel = Channel(self.gateway, id, pickle=pickle)
self._channels[id] = channel
return channel
finally:
@@ -270,14 +282,16 @@
queue.put(ENDMARKER)
self._no_longer_opened(id)
- def _local_receive(self, id, data):
+ def _local_receive(self, id, data, tryunpickle=False):
# executes in receiver thread
self._receivelock.acquire()
try:
+ channel = self._channels.get(id)
+ if tryunpickle and channel and channel._picklememo is not None:
+ data = unpickle(data, channel._picklememo)
try:
callback, endmarker = self._callbacks[id]
except KeyError:
- channel = self._channels.get(id)
queue = channel and channel._items
if queue is None:
pass # drop data
@@ -319,3 +333,10 @@
state = self.channel.isclosed() and 'closed' or 'open'
return '<ChannelFile %d %s>' %(self.channel.id, state)
+
+def unpickle(data, memo):
+ from cStringIO import StringIO
+ f = StringIO(data)
+ u = Unpickler(f)
+ u.memo = memo
+ return u.load()
Modified: py/branch/event/py/execnet/gateway.py
==============================================================================
--- py/branch/event/py/execnet/gateway.py (original)
+++ py/branch/event/py/execnet/gateway.py Sat Aug 2 08:49:01 2008
@@ -232,7 +232,7 @@
exec co in loc
finally:
close()
- self._trace("execution finished:", repr(source)[:50])
+ self._trace("execution finished:", repr(source)[:500])
except (KeyboardInterrupt, SystemExit):
pass
except self._StopExecLoop:
@@ -262,17 +262,19 @@
# High Level Interface
# _____________________________________________________________________
#
- def newchannel(self):
+ def newchannel(self):
""" return new channel object. """
return self._channelfactory.new()
- def remote_exec(self, source, stdout=None, stderr=None):
+ def remote_exec(self, source, stdout=None, stderr=None, _pickle=False):
""" return channel object and connect it to a remote
execution thread where the given 'source' executes
and has the sister 'channel' object in its global
namespace. The callback functions 'stdout' and
'stderr' get called on receival of remote
stdout/stderr output strings.
+ _pickle: set to true to enable experimental support for
+ sending/receiving picklable objects through the channel.
"""
try:
source = str(Source(source))
@@ -282,11 +284,11 @@
source = str(py.code.Source(source))
except ImportError:
pass
- channel = self.newchannel()
+ channel = self._channelfactory.new(pickle=_pickle)
outid = self._newredirectchannelid(stdout)
errid = self._newredirectchannelid(stderr)
self._send(Message.CHANNEL_OPEN(
- channel.id, (source, outid, errid)))
+ channel.id, (_pickle, (source, outid, errid))))
return channel
def _remote_redirect(self, stdout=None, stderr=None):
Modified: py/branch/event/py/execnet/message.py
==============================================================================
--- py/branch/event/py/execnet/message.py (original)
+++ py/branch/event/py/execnet/message.py Sat Aug 2 08:49:01 2008
@@ -62,8 +62,9 @@
class CHANNEL_OPEN(Message):
def received(self, gateway):
- channel = gateway._channelfactory.new(self.channelid)
- gateway._local_schedulexec(channel=channel, sourcetask=self.data)
+ pickle, sourcetask = self.data
+ channel = gateway._channelfactory.new(self.channelid, pickle=pickle)
+ gateway._local_schedulexec(channel=channel, sourcetask=sourcetask)
class CHANNEL_NEW(Message):
def received(self, gateway):
@@ -74,7 +75,8 @@
class CHANNEL_DATA(Message):
def received(self, gateway):
- gateway._channelfactory._local_receive(self.channelid, self.data)
+ gateway._channelfactory._local_receive(self.channelid, self.data,
+ tryunpickle=True)
class CHANNEL_CLOSE(Message):
def received(self, gateway):
Modified: py/branch/event/py/execnet/testing/test_gateway.py
==============================================================================
--- py/branch/event/py/execnet/testing/test_gateway.py (original)
+++ py/branch/event/py/execnet/testing/test_gateway.py Sat Aug 2 08:49:01 2008
@@ -344,6 +344,22 @@
assert first.strip() == 'hello world'
py.test.raises(EOFError, channel.receive)
+ def test_pickling(self):
+ channel = self.gw.remote_exec("""
+ l1 = channel.receive()
+ l2 = channel.receive()
+ channel.send(l1 + l2)
+ channel.send(l2 is l1)
+ """, _pickle=True)
+ assert isinstance(channel._picklememo, dict)
+ l1 = ['hello']
+ channel.send(l1)
+ channel.send(l1)
+ newl = channel.receive()
+ isl = channel.receive()
+ assert newl == l1 + l1
+ assert isl == True
+
def test_confusion_from_os_write_stdout(self):
channel = self.gw.remote_exec("""
import os
@@ -383,7 +399,7 @@
""")
text = c1.receive()
assert text.find("execution disallowed") != -1
-
+
#class TestBlockingIssues:
# def test_join_blocked_execution_gateway(self):
# gateway = py.execnet.PopenGateway()
@@ -411,7 +427,7 @@
c = gw.remote_exec("import os ; channel.send(os.getcwd())")
x = c.receive()
assert x == str(waschangedir)
-
+
def test_many_popen(self):
num = 4
l = []
More information about the pytest-commit
mailing list