[py-svn] r10848 - in py/dist/py: execnet execnet/testing test/terminal

hpk at codespeak.net hpk at codespeak.net
Tue Apr 19 13:46:31 CEST 2005


Author: hpk
Date: Tue Apr 19 13:46:31 2005
New Revision: 10848

Modified:
   py/dist/py/execnet/channel.py
   py/dist/py/execnet/gateway.py
   py/dist/py/execnet/message.py
   py/dist/py/execnet/testing/test_gateway.py
   py/dist/py/test/terminal/remote.py
Log:
refactoring/reshuffling execnet a bit


Modified: py/dist/py/execnet/channel.py
==============================================================================
--- py/dist/py/execnet/channel.py	(original)
+++ py/dist/py/execnet/channel.py	Tue Apr 19 13:46:31 2005
@@ -26,12 +26,52 @@
         flag = self.isclosed() and "closed" or "open"
         return "<Channel id=%d %s>" % (self.id, flag)
 
+    #
+    # internal methods, called from the receiver thread 
+    #
+    def _do_close(self, finalitem=EOFError()):
+        if self.id in self.gateway.channelfactory:
+            del self.gateway.channelfactory[self.id]
+        self._finalitem = finalitem
+        #for x in self._depchannel:
+        #    x._close()
+        self._items.put(finalitem)
+        self._closeevent.set()
+
+
+    def _do_receivechannel(self, newid):
+        """ receive a remotely created new (sub)channel. """
+        newchannel = Channel(self.gateway, newid)
+        self.gateway.channelfactory[newid] = newchannel
+        #self._depchannel.append(newchannel)
+        self._items.put(newchannel)
+
+    def _do_receivedata(self, data): 
+        if self._callback is not None: 
+            self.gateway._dispatchcallback(self._callback, data) 
+        else: 
+            self._items.put(data) 
+
+    def _do_scheduleexec(self, sourcetask): 
+        self.gateway._scheduleexec(channel=self, sourcetask=sourcetask) 
+
+    #
+    # public API for channel objects 
+    #
     def isclosed(self):
+        """ return True if the channel is closed. A closed 
+            channel may still hold items. 
+        """ 
         return self._closeevent.isSet()
 
-    def open(self, mode='w'):
+    def makefile(self, mode='w', proxyclose=True):
+        """ return a file-like object.  Only supported mode right
+            now is 'w' for binary writes. By default, closing 
+            the file will close the channel.  Pass proxyclose=False 
+            if you want to ignore file.close()s. 
+        """ 
         assert mode == 'w'
-        return ChannelFile(self)
+        return ChannelFile(channel=self, proxyclose=proxyclose)
 
     def close(self, error=None):
         """ close down this channel on both sides. """
@@ -41,38 +81,16 @@
                 put(Message.CHANNEL_CLOSE_ERROR(self.id, str(error)))
             else:
                 put(Message.CHANNEL_CLOSE(self.id))
-            self._close()
+            self._do_close()
 
     def remote_exec(self, source, stdout=None, stderr=None): 
         self.gateway._remote_exec(self, source, stdout, stderr) 
 
-    def _close(self, finalitem=EOFError()):
-        if self.id in self.gateway.channelfactory:
-            del self.gateway.channelfactory[self.id]
-        self._finalitem = finalitem
-        #for x in self._depchannel:
-        #    x._close()
-        self._items.put(finalitem)
-        self._closeevent.set()
-
     def newchannel(self):
-        """ return a new channel. """
+        """ return a new independent channel."""
         chan = self.gateway.channelfactory.new()
         return chan
 
-    def _receivechannel(self, newid):
-        """ receive a remotely created new (sub)channel. """
-        newchannel = Channel(self.gateway, newid)
-        self.gateway.channelfactory[newid] = newchannel
-        #self._depchannel.append(newchannel)
-        self._items.put(newchannel)
-
-    def _receivedata(self, data): 
-        if self._callback is not None: 
-            self.gateway._dispatchcallback(self._callback, data) 
-        else: 
-            self._items.put(data) 
-
     def waitclose(self, timeout):
         """ wait until this channel is closed.  Note that a closed
         channel may still hold items that can be received or
@@ -146,11 +164,7 @@
             self._lock.release()
 
     def __contains__(self, key):
-        self._lock.acquire()
-        try:
-            return key in self._dict
-        finally:
-            self._lock.release()
+        return key in self._dict
 
     def values(self):
         self._lock.acquire()
@@ -160,11 +174,8 @@
             self._lock.release()
 
     def __getitem__(self, key):
-        self._lock.acquire()
-        try:
-            return self._dict[key]
-        finally:
-            self._lock.release()
+        return self._dict[key]
+
     def __setitem__(self, key, value):
         self._lock.acquire()
         try:
@@ -180,21 +191,24 @@
 
 
 class ChannelFile:
-    def __init__(self, channel):
+    def __init__(self, channel, proxyclose=True):
         self.channel = channel
+        self._proxyclose = proxyclose 
 
     def write(self, out):
         if self.channel.isclosed():
-            return
+            return 
+            # XXX: raise IOError, "channel %r already closed" %(self.channel,)
         self.channel.send(out)
 
     def flush(self):
         pass
 
     def close(self):
-        self.channel.close()
+        if self._proxyclose: 
+            self.channel.close()
 
     def __repr__(self):
         state = self.channel.isclosed() and 'closed' or 'open'
-        return '<ChannelFile %d %s>' %(self.channel.id, state)
+        return '<ChannelFile %d %s>' %(self.channel.id, state) 
 

Modified: py/dist/py/execnet/gateway.py
==============================================================================
--- py/dist/py/execnet/gateway.py	(original)
+++ py/dist/py/execnet/gateway.py	Tue Apr 19 13:46:31 2005
@@ -19,7 +19,7 @@
 assert Message and ChannelFactory, "Import/Configuration Error"
 
 import os
-debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid()  , 'wa')
+debug = open('/tmp/execnet-debug-%d' % os.getpid()  , 'wa')
 
 sysex = (KeyboardInterrupt, SystemExit)
 

Modified: py/dist/py/execnet/message.py
==============================================================================
--- py/dist/py/execnet/message.py	(original)
+++ py/dist/py/execnet/message.py	Tue Apr 19 13:46:31 2005
@@ -65,7 +65,7 @@
             # executes in receiver thread 
             gateway._stopexec()
             for x in gateway.channelfactory.values():
-                x._close()
+                x._do_close()
             gateway._outgoing.put(self.STOP_RECEIVING())
             raise SystemExit
         def post_sent(self, gateway, excinfo=None):
@@ -81,37 +81,40 @@
             # already. With sockets closing it would raise
             # a Transport Not Connected exception
             for x in gateway.channelfactory.values():
-                x._close()
+                x._do_close()
             raise SystemExit
         def post_sent(self, gateway, excinfo=None):
+            # after we sent STOP_RECEIVING we don't
+            # want to write anything more anymore. 
             gateway.io.close_write()
             raise SystemExit
 
     class CHANNEL_OPEN(Message):
         def received(self, gateway):
             channel = gateway.channelfactory.new(self.channelid)
-            gateway._scheduleexec(channel, self.data)
+            channel._do_scheduleexec(self.data) 
 
     class CHANNEL_NEW(Message):
         def received(self, gateway):
             newid = self.data
             channel = gateway.channelfactory[self.channelid]
-            channel._receivechannel(newid)
+            channel._do_receivechannel(newid)
 
     class CHANNEL_DATA(Message):
         def received(self, gateway):
             channel = gateway.channelfactory[self.channelid]
-            channel._receivedata(self.data) 
+            channel._do_receivedata(self.data) 
 
     class CHANNEL_CLOSE(Message):
         def received(self, gateway):
             channel = gateway.channelfactory[self.channelid]
-            channel._close()
+            channel._do_close()
 
     class CHANNEL_CLOSE_ERROR(Message):
         def received(self, gateway):
             channel = gateway.channelfactory[self.channelid]
-            channel._close(gateway.RemoteError(self.data))
+            channel._do_close(gateway.RemoteError(self.data))
+
     classes = [x for x in locals().values() if hasattr(x, '__bases__')]
     classes.sort(lambda x,y : cmp(x.__name__, y.__name__))
     i = 0

Modified: py/dist/py/execnet/testing/test_gateway.py
==============================================================================
--- py/dist/py/execnet/testing/test_gateway.py	(original)
+++ py/dist/py/execnet/testing/test_gateway.py	Tue Apr 19 13:46:31 2005
@@ -77,12 +77,12 @@
 
 class BasicRemoteExecution:
     def test_correct_setup(self):
-        for x in 'sender', 'receiver': # , 'executor': 
+        for x in 'sender', 'receiver':  
             assert self.gw.pool.getstarted(x) 
 
     def test_remote_exec_waitclose(self):
         channel = self.gw.remote_exec('pass')
-        channel.waitclose(timeout=3.0)
+        channel.waitclose(timeout=1.0)
 
     def test_remote_exec_channel_anonymous(self):
         channel = self.gw.remote_exec('''
@@ -111,14 +111,14 @@
         assert x == 42
         py.test.raises(gateway.RemoteError, channel.receive)
 
-    def test_channel_close(self):
+    def test_channel__do_close(self):
         channel = self.gw.channelfactory.new()
-        channel._close()
+        channel._do_close()
         channel.waitclose(0.1)
 
-    def test_channel_close_error(self):
+    def test_channel__do_close_error(self):
         channel = self.gw.channelfactory.new()
-        channel._close(gateway.RemoteError("error"))
+        channel._do_close(gateway.RemoteError("error"))
         py.test.raises(gateway.RemoteError, channel.waitclose, 0.01)
 
     def test_channel_iter(self):
@@ -206,6 +206,17 @@
             s = subl[0]
             assert s.strip() == str(i)
 
+    def future_test_channel_file(self): 
+        self.gw.remote_exec("""
+            f = channel.makefile() 
+            print >>f, "hello world" 
+            f.close() 
+            channel.send(42) 
+        """)
+        for x in channel.receive(): 
+            print x 
+        assert 0
+
 class TestBasicPopenGateway(PopenGatewayTestSetup, BasicRemoteExecution):
     #disabled = True
     def test_many_popen(self):

Modified: py/dist/py/test/terminal/remote.py
==============================================================================
--- py/dist/py/test/terminal/remote.py	(original)
+++ py/dist/py/test/terminal/remote.py	Tue Apr 19 13:46:31 2005
@@ -1,6 +1,5 @@
 from __future__ import generators
 import py
-from py.__impl__.execnet.channel import ChannelFile 
 from py.__impl__.test.terminal.out import getout 
 import sys
 



More information about the pytest-commit mailing list