[py-svn] r56908 - in py/branch/event/py/execnet: . testing

hpk at codespeak.net hpk at codespeak.net
Sat Aug 2 08:49:03 CEST 2008


Author: hpk
Date: Sat Aug  2 08:49:01 2008
New Revision: 56908

Modified:
   py/branch/event/py/execnet/channel.py
   py/branch/event/py/execnet/gateway.py
   py/branch/event/py/execnet/message.py
   py/branch/event/py/execnet/testing/test_gateway.py
Log:
add experimental pickling support to execnet channels  


Modified: py/branch/event/py/execnet/channel.py
==============================================================================
--- py/branch/event/py/execnet/channel.py	(original)
+++ py/branch/event/py/execnet/channel.py	Sat Aug  2 08:49:01 2008
@@ -1,5 +1,7 @@
 import threading, weakref, sys
 import Queue
+from cPickle import Pickler, Unpickler
+
 if 'Message' not in globals():
     from py.__.execnet.message import Message
 
@@ -25,8 +27,9 @@
 class Channel(object):
     """Communication channel between two possibly remote threads of code. """
     RemoteError = RemoteError
+    _picklememo = None  # not None means that we are in pickling mode 
 
-    def __init__(self, gateway, id):
+    def __init__(self, gateway, id, pickle=False):
         assert isinstance(id, int)
         self.gateway = gateway
         self.id = id
@@ -34,6 +37,8 @@
         self._closed = False
         self._receiveclosed = threading.Event()
         self._remoteerrors = []
+        if pickle:
+            self._picklememo = {}
 
     def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED):
         queue = self._items
@@ -156,6 +161,13 @@
         if isinstance(item, Channel):
             data = Message.CHANNEL_NEW(self.id, item.id)
         else:
+            if self._picklememo is not None:
+                from cStringIO import StringIO  
+                f = StringIO()
+                pickler = Pickler(f, protocol=-1)  # use best protocol
+                pickler.memo = self._picklememo
+                pickler.dump(item)
+                item = f.getvalue()
             data = Message.CHANNEL_DATA(self.id, item)
         self.gateway._send(data)
 
@@ -203,7 +215,7 @@
         self.count = startcount
         self.finished = False
 
-    def new(self, id=None):
+    def new(self, id=None, pickle=False):
         """ create a new Channel with 'id' (or create new id if None). """
         self._writelock.acquire()
         try:
@@ -212,7 +224,7 @@
             if id is None:
                 id = self.count
                 self.count += 2
-            channel = Channel(self.gateway, id)
+            channel = Channel(self.gateway, id, pickle=pickle)
             self._channels[id] = channel
             return channel
         finally:
@@ -270,14 +282,16 @@
                 queue.put(ENDMARKER)
         self._no_longer_opened(id)
 
-    def _local_receive(self, id, data): 
+    def _local_receive(self, id, data, tryunpickle=False): 
         # executes in receiver thread
         self._receivelock.acquire()
         try:
+            channel = self._channels.get(id)
+            if tryunpickle and channel and channel._picklememo is not None:
+                data = unpickle(data, channel._picklememo)
             try:
                 callback, endmarker = self._callbacks[id]
             except KeyError:
-                channel = self._channels.get(id)
                 queue = channel and channel._items
                 if queue is None:
                     pass    # drop data
@@ -319,3 +333,10 @@
         state = self.channel.isclosed() and 'closed' or 'open'
         return '<ChannelFile %d %s>' %(self.channel.id, state) 
 
+
+def unpickle(data, memo):
+    from cStringIO import StringIO
+    f = StringIO(data)
+    u = Unpickler(f)
+    u.memo = memo
+    return u.load()

Modified: py/branch/event/py/execnet/gateway.py
==============================================================================
--- py/branch/event/py/execnet/gateway.py	(original)
+++ py/branch/event/py/execnet/gateway.py	Sat Aug  2 08:49:01 2008
@@ -232,7 +232,7 @@
                 exec co in loc
             finally:
                 close() 
-                self._trace("execution finished:", repr(source)[:50])
+                self._trace("execution finished:", repr(source)[:500])
         except (KeyboardInterrupt, SystemExit):
             pass 
         except self._StopExecLoop:
@@ -262,17 +262,19 @@
     # High Level Interface
     # _____________________________________________________________________
     #
-    def newchannel(self): 
+    def newchannel(self):
         """ return new channel object.  """ 
         return self._channelfactory.new()
 
-    def remote_exec(self, source, stdout=None, stderr=None): 
+    def remote_exec(self, source, stdout=None, stderr=None, _pickle=False):
         """ return channel object and connect it to a remote
             execution thread where the given 'source' executes
             and has the sister 'channel' object in its global 
             namespace.  The callback functions 'stdout' and 
             'stderr' get called on receival of remote 
             stdout/stderr output strings. 
+            _pickle: set to true to enable experimental support for 
+            sending/receiving picklable objects through the channel.
         """
         try:
             source = str(Source(source))
@@ -282,11 +284,11 @@
                 source = str(py.code.Source(source))
             except ImportError: 
                 pass 
-        channel = self.newchannel() 
+        channel = self._channelfactory.new(pickle=_pickle)
         outid = self._newredirectchannelid(stdout) 
         errid = self._newredirectchannelid(stderr) 
         self._send(Message.CHANNEL_OPEN(
-                    channel.id, (source, outid, errid)))
+                    channel.id, (_pickle, (source, outid, errid))))
         return channel 
 
     def _remote_redirect(self, stdout=None, stderr=None): 

Modified: py/branch/event/py/execnet/message.py
==============================================================================
--- py/branch/event/py/execnet/message.py	(original)
+++ py/branch/event/py/execnet/message.py	Sat Aug  2 08:49:01 2008
@@ -62,8 +62,9 @@
 
     class CHANNEL_OPEN(Message):
         def received(self, gateway):
-            channel = gateway._channelfactory.new(self.channelid)
-            gateway._local_schedulexec(channel=channel, sourcetask=self.data)
+            pickle, sourcetask = self.data 
+            channel = gateway._channelfactory.new(self.channelid, pickle=pickle)
+            gateway._local_schedulexec(channel=channel, sourcetask=sourcetask)
 
     class CHANNEL_NEW(Message):
         def received(self, gateway):
@@ -74,7 +75,8 @@
 
     class CHANNEL_DATA(Message):
         def received(self, gateway):
-            gateway._channelfactory._local_receive(self.channelid, self.data)
+            gateway._channelfactory._local_receive(self.channelid, self.data, 
+                tryunpickle=True)
 
     class CHANNEL_CLOSE(Message):
         def received(self, gateway):

Modified: py/branch/event/py/execnet/testing/test_gateway.py
==============================================================================
--- py/branch/event/py/execnet/testing/test_gateway.py	(original)
+++ py/branch/event/py/execnet/testing/test_gateway.py	Sat Aug  2 08:49:01 2008
@@ -344,6 +344,22 @@
         assert first.strip() == 'hello world' 
         py.test.raises(EOFError, channel.receive)
 
+    def test_pickling(self):
+        channel = self.gw.remote_exec("""
+            l1 = channel.receive()
+            l2 = channel.receive()
+            channel.send(l1 + l2)
+            channel.send(l2 is l1)
+        """, _pickle=True)
+        assert isinstance(channel._picklememo, dict)
+        l1 = ['hello']
+        channel.send(l1)
+        channel.send(l1)
+        newl = channel.receive()
+        isl = channel.receive()
+        assert newl == l1 + l1
+        assert isl == True
+
     def test_confusion_from_os_write_stdout(self):
         channel = self.gw.remote_exec("""
             import os
@@ -383,7 +399,7 @@
         """)
         text = c1.receive()
         assert text.find("execution disallowed") != -1 
-    
+        
 #class TestBlockingIssues: 
 #    def test_join_blocked_execution_gateway(self): 
 #        gateway = py.execnet.PopenGateway() 
@@ -411,7 +427,7 @@
         c = gw.remote_exec("import os ; channel.send(os.getcwd())")
         x = c.receive()
         assert x == str(waschangedir)
-        
+
     def test_many_popen(self):
         num = 4
         l = []



More information about the pytest-commit mailing list