[py-svn] r57100 - in py/branch/event/py/test2/rsession: . testing

hpk at codespeak.net hpk at codespeak.net
Fri Aug 8 15:26:30 CEST 2008


Author: hpk
Date: Fri Aug  8 15:26:29 2008
New Revision: 57100

Modified:
   py/branch/event/py/test2/rsession/pickle.py
   py/branch/event/py/test2/rsession/testing/test_pickle.py
Log:
introduce a new transparent overlay PickleChannel that 
helps to maintain object identity between two sides.


Modified: py/branch/event/py/test2/rsession/pickle.py
==============================================================================
--- py/branch/event/py/test2/rsession/pickle.py	(original)
+++ py/branch/event/py/test2/rsession/pickle.py	Fri Aug  8 15:26:29 2008
@@ -1,6 +1,7 @@
 
 from cStringIO import StringIO
 from cPickle import Pickler, Unpickler
+from py.__.execnet.channel import Channel
 
 class ImmutablePickler:
     def __init__(self, protocol=0):
@@ -14,7 +15,6 @@
         self._updatepicklememo()
         pickler.memo = self._picklememo
         pickler.dump(obj)
-
         return f.getvalue()
 
     def loads(self, string):
@@ -39,3 +39,45 @@
             self._unpicklememo.update(dict(
                 [(x, y) 
                     for x, y in self._picklememo.itervalues()]))
+
+NO_ENDMARKER_WANTED = object()
+
+class PickleChannel(object):
+    def __init__(self, channel):
+        self._channel = channel
+        self._ipickle = ImmutablePickler() # xxx proto optimization?
+
+    def send(self, obj):
+        if not isinstance(obj, Channel):
+            pickled_obj = self._ipickle.dumps(obj)
+            self._channel.send(pickled_obj)
+        else:
+            self._channel.send(obj)
+
+    def receive(self):
+        pickled_obj = self._channel.receive()
+        return self._unpickle(pickled_obj)
+
+    def _unpickle(self, pickled_obj):
+        if isinstance(pickled_obj, self._channel.__class__):
+            return pickled_obj
+        return self._ipickle.loads(pickled_obj)
+
+    def waitclose(self):
+        return self._channel.waitclose()
+
+    def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED):
+        if endmarker is NO_ENDMARKER_WANTED:
+            def unpickle_callback(pickled_obj):
+                obj = self._unpickle(pickled_obj)
+                callback(obj)
+            self._channel.setcallback(unpickle_callback)
+        else:
+            uniqueendmarker = object()
+            def unpickle_callback_endmarked(pickled_obj):
+                if pickled_obj is uniqueendmarker:
+                    callback(endmarker)
+                else:
+                    obj = self._unpickle(pickled_obj)
+                    callback(obj)
+            self._channel.setcallback(unpickle_callback_endmarked, uniqueendmarker)

Modified: py/branch/event/py/test2/rsession/testing/test_pickle.py
==============================================================================
--- py/branch/event/py/test2/rsession/testing/test_pickle.py	(original)
+++ py/branch/event/py/test2/rsession/testing/test_pickle.py	Fri Aug  8 15:26:29 2008
@@ -1,10 +1,9 @@
 
 import py
-from py.__.test2.rsession.pickle import ImmutablePickler
+from py.__.test2.rsession.pickle import ImmutablePickler, PickleChannel
 
 class A: 
     pass
-
 def test_pickle_and_back_IS_same():
 
     def pickle_band_back_IS_same(obj, proto):
@@ -22,3 +21,92 @@
     for proto in 0,1,2,-1:
         for obj in {1:2}, [1,2,3], a1, a2:
             yield pickle_band_back_IS_same, obj, proto
+
+
+TESTTIMEOUT = 2.0
+class TestPickleChannelFunctional:
+    def setup_class(cls):
+        cls.gw = py.execnet.PopenGateway()
+        cls.gw.remote_init_threads(5)
+
+    def teardown_class(cls):
+        cls.gw.exit()
+
+    def test_popen_send_instance(self):
+        channel = self.gw.remote_exec("""
+            from py.__.test2.rsession.pickle import PickleChannel
+            channel = PickleChannel(channel)
+            from py.__.test2.rsession.testing.test_pickle import A
+            a1 = A()
+            a1.hello = 10
+            channel.send(a1)
+            a2 = channel.receive()
+            channel.send(a2 is a1)
+        """)
+        channel = PickleChannel(channel)
+        a_received = channel.receive()
+        assert isinstance(a_received, A)
+        assert a_received.hello == 10
+        channel.send(a_received)
+        remote_a2_is_a1 = channel.receive()
+        assert remote_a2_is_a1 
+
+
+    def test_popen_with_callback(self):
+        channel = self.gw.remote_exec("""
+            from py.__.test2.rsession.pickle import PickleChannel
+            channel = PickleChannel(channel)
+            from py.__.test2.rsession.testing.test_pickle import A
+            a1 = A()
+            a1.hello = 10
+            channel.send(a1)
+            a2 = channel.receive()
+            channel.send(a2 is a1)
+        """)
+        channel = PickleChannel(channel)
+        queue = py.std.Queue.Queue()
+        channel.setcallback(queue.put)
+        a_received = queue.get(timeout=TESTTIMEOUT)
+        assert isinstance(a_received, A)
+        assert a_received.hello == 10
+        channel.send(a_received)
+        #remote_a2_is_a1 = queue.get(timeout=TESTTIMEOUT)
+        #assert remote_a2_is_a1 
+
+    def test_popen_with_callback_with_endmarker(self):
+        channel = self.gw.remote_exec("""
+            from py.__.test2.rsession.pickle import PickleChannel
+            channel = PickleChannel(channel)
+            from py.__.test2.rsession.testing.test_pickle import A
+            a1 = A()
+            a1.hello = 10
+            channel.send(a1)
+            a2 = channel.receive()
+            channel.send(a2 is a1)
+        """)
+        channel = PickleChannel(channel)
+        queue = py.std.Queue.Queue()
+        channel.setcallback(queue.put, endmarker=-1)
+          
+        a_received = queue.get(timeout=TESTTIMEOUT)
+        assert isinstance(a_received, A)
+        assert a_received.hello == 10
+        channel.send(a_received)
+        remote_a2_is_a1 = queue.get(timeout=TESTTIMEOUT)
+        assert remote_a2_is_a1 
+        endmarker = queue.get(timeout=TESTTIMEOUT)
+        assert endmarker == -1
+
+    def test_popen_with_newchannel(self):
+        channel = self.gw.remote_exec("""
+            from py.__.test2.rsession.pickle import PickleChannel
+            channel = PickleChannel(channel)
+            newchannel = channel.receive()
+            newchannel.send(42)
+        """)
+        channel = PickleChannel(channel)
+        newchannel = self.gw.newchannel()
+        channel.send(newchannel)
+        channel.waitclose()
+        res = newchannel.receive()
+        assert res == 42



More information about the pytest-commit mailing list