[py-svn] r10854 - in py/dist/py/execnet: . testing

hpk at codespeak.net hpk at codespeak.net
Tue Apr 19 14:35:47 CEST 2005


Author: hpk
Date: Tue Apr 19 14:35:47 2005
New Revision: 10854

Modified:
   py/dist/py/execnet/channel.py
   py/dist/py/execnet/gateway.py
   py/dist/py/execnet/register.py
   py/dist/py/execnet/testing/test_gateway.py
Log:
making execnet work on remote sides that don't have a py lib 


Modified: py/dist/py/execnet/channel.py
==============================================================================
--- py/dist/py/execnet/channel.py	(original)
+++ py/dist/py/execnet/channel.py	Tue Apr 19 14:35:47 2005
@@ -42,13 +42,13 @@
     #
     # internal methods, called from the receiver thread 
     #
-    def _do_close(self, finalitem=EOFError()):
+    def _do_close(self, stickyerror=EOFError()):
         if self.id in self.gateway.channelfactory:
             del self.gateway.channelfactory[self.id]
-        self._finalitem = finalitem
+        self._stickyerror = stickyerror
         #for x in self._depchannel:
         #    x._close()
-        self._items.put(finalitem)
+        self._items.put(ENDMARKER) 
         self._closeevent.set()
 
 
@@ -105,23 +105,25 @@
         return chan
 
     def waitclose(self, timeout):
-        """ wait until this channel is closed.  Note that a closed
-        channel may still hold items that can be received or
-        send. Also note that exceptions from the other side will be
-        reraised as channel.RemoteError exceptions containing
-        a textual representation of the remote traceback.
+        """ wait until this channel is closed.  A closed
+        channel may still hold receiveable items.  waitclose()
+        reraises exceptions from executing code on the other 
+        side as channel.RemoteErrors containing a a textual 
+        representation of the remote traceback.
         """
         self._closeevent.wait(timeout=timeout)
         if not self._closeevent.isSet():
             raise IOError, "Timeout"
-        if isinstance(self._finalitem, self.RemoteError):
-            raise self._finalitem
+        if isinstance(self._stickyerror, self.RemoteError):
+            raise self._stickyerror
 
     def send(self, item):
         """sends the given item to the other side of the channel,
         possibly blocking if the sender queue is full.
         Note that an item needs to be marshallable.
         """
+        if self.isclosed(): 
+            raise IOError, "cannot send via %r anymore" %(self,) 
         if isinstance(item, Channel):
             data = Message.CHANNEL_NEW(self.id, item.id)
         else:
@@ -138,10 +140,11 @@
         if self._callback: 
             raise IOError("calling receive() on channel with callback")
         x = self._items.get()
-        if isinstance(x, EOFError):
-            self._items.put(x) 
-            raise x
-        return x
+        if x is ENDMARKER: 
+            self._items.put(x)  # for other receivers 
+            raise self._stickyerror 
+        else: 
+            return x
     
     def __iter__(self):
         return self 
@@ -156,6 +159,8 @@
 # helpers
 #
 
+ENDMARKER = object() 
+
 class ChannelFactory(object):
     def __init__(self, gateway, startcount=1):
         self._dict = dict()

Modified: py/dist/py/execnet/gateway.py
==============================================================================
--- py/dist/py/execnet/gateway.py	(original)
+++ py/dist/py/execnet/gateway.py	Tue Apr 19 14:35:47 2005
@@ -7,7 +7,7 @@
 
 
 # XXX the following line should not be here
-if 'Message' not in globals(): 
+if 'ThreadOut' not in globals(): 
     import py 
     from py.code import Source
     from py.__impl__.execnet.channel import ChannelFactory, Channel
@@ -16,10 +16,8 @@
     WorkerPool = py._thread.WorkerPool 
     NamedThreadPool = py._thread.NamedThreadPool 
 
-assert Message and ChannelFactory, "Import/Configuration Error"
-
 import os
-debug = open('/tmp/execnet-debug-%d' % os.getpid()  , 'wa')
+debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid()  , 'wa')
 
 sysex = (KeyboardInterrupt, SystemExit)
 

Modified: py/dist/py/execnet/register.py
==============================================================================
--- py/dist/py/execnet/register.py	(original)
+++ py/dist/py/execnet/register.py	Tue Apr 19 14:35:47 2005
@@ -12,12 +12,12 @@
 #     in a sufficient version 
 
 startup_modules = [
+    'py.__impl__.thread.io', 
+    'py.__impl__.thread.pool', 
     'py.__impl__.execnet.inputoutput', 
     'py.__impl__.execnet.gateway', 
-    'py.__impl__.execnet.channel', 
     'py.__impl__.execnet.message', 
-    'py.__impl__.thread.io', 
-    'py.__impl__.thread.pool', 
+    'py.__impl__.execnet.channel', 
 ]
 
 def getsource(dottedname): 

Modified: py/dist/py/execnet/testing/test_gateway.py
==============================================================================
--- py/dist/py/execnet/testing/test_gateway.py	(original)
+++ py/dist/py/execnet/testing/test_gateway.py	Tue Apr 19 14:35:47 2005
@@ -84,6 +84,11 @@
         channel = self.gw.remote_exec('pass')
         channel.waitclose(timeout=1.0)
 
+    def test_remote_exec_error_after_close(self):
+        channel = self.gw.remote_exec('pass')
+        channel.waitclose(timeout=1.0)
+        py.test.raises(IOError, channel.send, 0)
+
     def test_remote_exec_channel_anonymous(self):
         channel = self.gw.remote_exec('''
                     obj = channel.receive()



More information about the pytest-commit mailing list