[py-svn] r57546 - in py/trunk/py/execnet: . testing

hpk at codespeak.net hpk at codespeak.net
Thu Aug 21 14:04:46 CEST 2008

Author: hpk
Date: Thu Aug 21 14:04:43 2008
New Revision: 57546

* channels now also provide makefile(mode) with mode = 'r'
  for providing file-like read/readline/close methods.
* added and refined crash and finalization tests  

Modified: py/trunk/py/execnet/channel.py
--- py/trunk/py/execnet/channel.py	(original)
+++ py/trunk/py/execnet/channel.py	Thu Aug 21 14:04:43 2008
@@ -103,13 +103,16 @@
         return self._closed
     def makefile(self, mode='w', proxyclose=False):
-        """ return a file-like object.  Only supported mode right
-            now is 'w' for binary writes.  If you want to have
-            a subsequent file.close() mean to close the channel
-            as well, then pass proxyclose=True. 
+        """ return a file-like object.  
+            mode: 'w' for binary writes, 'r' for binary reads 
+            proxyclose: set to true if you want to have a 
+            subsequent file.close() automatically close the channel. 
-        assert mode == 'w', "mode %r not availabe" %(mode,)
-        return ChannelFile(channel=self, proxyclose=proxyclose)
+        if mode == "w":
+            return ChannelFileWrite(channel=self, proxyclose=proxyclose)
+        elif mode == "r":
+            return ChannelFileRead(channel=self, proxyclose=proxyclose)
+        raise ValueError("mode %r not availabe" %(mode,))
     def close(self, error=None):
         """ close down this channel on both sides. """
@@ -299,18 +302,11 @@
         for id in self._callbacks.keys():
-class ChannelFile:
+class ChannelFile(object):
     def __init__(self, channel, proxyclose=True):
         self.channel = channel
         self._proxyclose = proxyclose 
-    def write(self, out):
-        self.channel.send(out)
-    def flush(self):
-        pass
     def close(self):
         if self._proxyclose: 
@@ -319,3 +315,38 @@
         state = self.channel.isclosed() and 'closed' or 'open'
         return '<ChannelFile %d %s>' %(self.channel.id, state) 
+class ChannelFileWrite(ChannelFile):
+    def write(self, out):
+        self.channel.send(out)
+    def flush(self):
+        pass
+class ChannelFileRead(ChannelFile):
+    def __init__(self, channel, proxyclose=True):
+        super(ChannelFileRead, self).__init__(channel, proxyclose)
+        self._buffer = ""
+    def read(self, n):
+        while len(self._buffer) < n:
+            try:
+                self._buffer += self.channel.receive()
+            except EOFError:
+                self.close() 
+                break
+        ret = self._buffer[:n]
+        self._buffer = self._buffer[n:]
+        return ret 
+    def readline(self):
+        i = self._buffer.find("\n")
+        if i != -1:
+            return self.read(i+1)
+        line = self.read(len(self._buffer)+1)
+        while line and line[-1] != "\n":
+            c = self.read(1)
+            if not c:
+                break
+            line += c
+        return line

Modified: py/trunk/py/execnet/testing/test_gateway.py
--- py/trunk/py/execnet/testing/test_gateway.py	(original)
+++ py/trunk/py/execnet/testing/test_gateway.py	Thu Aug 21 14:04:43 2008
@@ -74,6 +74,11 @@
         channel = self.fac.new()
         py.test.raises(IOError, channel.waitclose, timeout=0.01)
+    def test_channel_makefile_incompatmode(self):
+        channel = self.fac.new()
+        py.test.raises(ValueError, 'channel.makefile("rw")')
 class PopenGatewayTestSetup:
     def setup_class(cls):
         cls.gw = py.execnet.PopenGateway()
@@ -291,6 +296,19 @@
         assert isinstance(l[2], channel.__class__) 
         assert l[3] == 999
+    def test_channel_endmarker_callback_error(self):
+        from Queue import Queue
+        q = Queue()
+        channel = self.gw.remote_exec(source='''
+            raise ValueError()
+        ''') 
+        channel.setcallback(q.put, endmarker=999)
+        val = q.get(TESTTIMEOUT)
+        assert val == 999
+        err = channel._getremoteerror()
+        assert err
+        assert str(err).find("ValueError") != -1
     def test_remote_redirect_stdout(self): 
         out = py.std.StringIO.StringIO() 
         handle = self.gw._remote_redirect(stdout=out) 
@@ -315,7 +333,7 @@
             s = subl[0]
             assert s.strip() == str(i)
-    def test_channel_file(self): 
+    def test_channel_file_write(self): 
         channel = self.gw.remote_exec("""
             f = channel.makefile() 
             print >>f, "hello world" 
@@ -344,6 +362,43 @@
         assert first.strip() == 'hello world' 
         py.test.raises(EOFError, channel.receive)
+    def test_channel_file_read(self): 
+        channel = self.gw.remote_exec("""
+            f = channel.makefile(mode='r') 
+            s = f.read(2)
+            channel.send(s) 
+            s = f.read(5)
+            channel.send(s) 
+        """)
+        channel.send("xyabcde")
+        s1 = channel.receive()
+        s2 = channel.receive()
+        assert s1 == "xy" 
+        assert s2 == "abcde"
+    def test_channel_file_read_empty(self): 
+        channel = self.gw.remote_exec("pass") 
+        f = channel.makefile(mode="r") 
+        s = f.read(3) 
+        assert s == ""
+        s = f.read(5) 
+        assert s == ""
+    def test_channel_file_readline_remote(self): 
+        channel = self.gw.remote_exec("""
+            channel.send('123\\n45')
+        """)
+        channel.waitclose(TESTTIMEOUT)
+        f = channel.makefile(mode="r") 
+        s = f.readline()
+        assert s == "123\n"
+        s = f.readline()
+        assert s == "45"
+    def test_channel_makefile_incompatmode(self):
+        channel = self.gw.newchannel()
+        py.test.raises(ValueError, 'channel.makefile("rw")')
     def test_confusion_from_os_write_stdout(self):
         channel = self.gw.remote_exec("""
             import os
@@ -383,7 +438,26 @@
         text = c1.receive()
         assert text.find("execution disallowed") != -1 
+def test_channel_endmarker_remote_killterm():
+    gw = py.execnet.PopenGateway()
+    try:
+        from Queue import Queue
+        q = Queue()
+        channel = gw.remote_exec(source='''
+            import os
+            os.kill(os.getpid(), 15)
+        ''') 
+        channel.setcallback(q.put, endmarker=999)
+        val = q.get(TESTTIMEOUT)
+        assert val == 999
+        err = channel._getremoteerror()
+    finally:
+        gw.exit()
+    py.test.skip("provide information on causes/signals "
+                 "of dying remote gateways")
 #class TestBlockingIssues: 
 #    def test_join_blocked_execution_gateway(self): 
 #        gateway = py.execnet.PopenGateway() 
@@ -437,29 +511,44 @@
             ret = channel.receive()
             assert ret == 42
-    def disabled_test_remote_is_killed_while_sending(self):
+    def test_waitclose_on_remote_killed(self):
+        py.test.skip("fix needed: dying remote process does not cause waitclose() to fail")
+        if not hasattr(py.std.os, 'kill'):
+            py.test.skip("no os.kill")
         gw = py.execnet.PopenGateway()
         channel = gw.remote_exec("""
             import os
             import time
-            channel.send(os.getppid())
             while 1:
-                channel.send('#'*1000)
-            time.sleep(10)
+                channel.send("#" * 100)
-        parent = channel.receive()
-        remote = channel.receive()
-        assert parent == os.getpid()
-        time.sleep(0.5) 
-        os.kill(remote, signal.SIGKILL)
-        time.sleep(1)
-        channel.waitclose(TESTTIMEOUT)
+        remotepid = channel.receive()
+        os.kill(remotepid, 9)
+        py.test.raises(channel.RemoteError, "channel.waitclose(TESTTIMEOUT)")
+        py.test.raises(EOFError, channel.send, None)
         py.test.raises(EOFError, channel.receive)
-        #channel.waitclose(TESTTIMEOUT)
-        #channel.send('#')
+def test_endmarker_delivery_on_remote_killterm():
+    if not hasattr(py.std.os, 'kill'):
+        py.test.skip("no os.kill()")
+    gw = py.execnet.PopenGateway()
+    try:
+        from Queue import Queue
+        q = Queue()
+        channel = gw.remote_exec(source='''
+            import os
+            os.kill(os.getpid(), 15)
+        ''') 
+        channel.setcallback(q.put, endmarker=999)
+        val = q.get(TESTTIMEOUT)
+        assert val == 999
+        err = channel._getremoteerror()
+    finally:
+        gw.exit()
+    py.test.skip("provide information on causes/signals "
+                 "of dying remote gateways")
 class SocketGatewaySetup:
     def setup_class(cls):
@@ -516,4 +605,7 @@
     py.test.raises(IOError, gw.remote_init_threads, 3)
+def test_nodebug():
+    from py.__.execnet import gateway
+    assert not gateway.debug

More information about the pytest-commit mailing list