[py-svn] r10848 - in py/dist/py: execnet execnet/testing test/terminal
hpk at codespeak.net
hpk at codespeak.net
Tue Apr 19 13:46:31 CEST 2005
Author: hpk
Date: Tue Apr 19 13:46:31 2005
New Revision: 10848
Modified:
py/dist/py/execnet/channel.py
py/dist/py/execnet/gateway.py
py/dist/py/execnet/message.py
py/dist/py/execnet/testing/test_gateway.py
py/dist/py/test/terminal/remote.py
Log:
refactoring/reshuffling execnet a bit
Modified: py/dist/py/execnet/channel.py
==============================================================================
--- py/dist/py/execnet/channel.py (original)
+++ py/dist/py/execnet/channel.py Tue Apr 19 13:46:31 2005
@@ -26,12 +26,52 @@
flag = self.isclosed() and "closed" or "open"
return "<Channel id=%d %s>" % (self.id, flag)
+ #
+ # internal methods, called from the receiver thread
+ #
+ def _do_close(self, finalitem=EOFError()):
+ if self.id in self.gateway.channelfactory:
+ del self.gateway.channelfactory[self.id]
+ self._finalitem = finalitem
+ #for x in self._depchannel:
+ # x._close()
+ self._items.put(finalitem)
+ self._closeevent.set()
+
+
+ def _do_receivechannel(self, newid):
+ """ receive a remotely created new (sub)channel. """
+ newchannel = Channel(self.gateway, newid)
+ self.gateway.channelfactory[newid] = newchannel
+ #self._depchannel.append(newchannel)
+ self._items.put(newchannel)
+
+ def _do_receivedata(self, data):
+ if self._callback is not None:
+ self.gateway._dispatchcallback(self._callback, data)
+ else:
+ self._items.put(data)
+
+ def _do_scheduleexec(self, sourcetask):
+ self.gateway._scheduleexec(channel=self, sourcetask=sourcetask)
+
+ #
+ # public API for channel objects
+ #
def isclosed(self):
+ """ return True if the channel is closed. A closed
+ channel may still hold items.
+ """
return self._closeevent.isSet()
- def open(self, mode='w'):
+ def makefile(self, mode='w', proxyclose=True):
+ """ return a file-like object. Only supported mode right
+ now is 'w' for binary writes. By default, closing
+ the file will close the channel. Pass proxyclose=False
+ if you want to ignore file.close()s.
+ """
assert mode == 'w'
- return ChannelFile(self)
+ return ChannelFile(channel=self, proxyclose=proxyclose)
def close(self, error=None):
""" close down this channel on both sides. """
@@ -41,38 +81,16 @@
put(Message.CHANNEL_CLOSE_ERROR(self.id, str(error)))
else:
put(Message.CHANNEL_CLOSE(self.id))
- self._close()
+ self._do_close()
def remote_exec(self, source, stdout=None, stderr=None):
self.gateway._remote_exec(self, source, stdout, stderr)
- def _close(self, finalitem=EOFError()):
- if self.id in self.gateway.channelfactory:
- del self.gateway.channelfactory[self.id]
- self._finalitem = finalitem
- #for x in self._depchannel:
- # x._close()
- self._items.put(finalitem)
- self._closeevent.set()
-
def newchannel(self):
- """ return a new channel. """
+ """ return a new independent channel."""
chan = self.gateway.channelfactory.new()
return chan
- def _receivechannel(self, newid):
- """ receive a remotely created new (sub)channel. """
- newchannel = Channel(self.gateway, newid)
- self.gateway.channelfactory[newid] = newchannel
- #self._depchannel.append(newchannel)
- self._items.put(newchannel)
-
- def _receivedata(self, data):
- if self._callback is not None:
- self.gateway._dispatchcallback(self._callback, data)
- else:
- self._items.put(data)
-
def waitclose(self, timeout):
""" wait until this channel is closed. Note that a closed
channel may still hold items that can be received or
@@ -146,11 +164,7 @@
self._lock.release()
def __contains__(self, key):
- self._lock.acquire()
- try:
- return key in self._dict
- finally:
- self._lock.release()
+ return key in self._dict
def values(self):
self._lock.acquire()
@@ -160,11 +174,8 @@
self._lock.release()
def __getitem__(self, key):
- self._lock.acquire()
- try:
- return self._dict[key]
- finally:
- self._lock.release()
+ return self._dict[key]
+
def __setitem__(self, key, value):
self._lock.acquire()
try:
@@ -180,21 +191,24 @@
class ChannelFile:
- def __init__(self, channel):
+ def __init__(self, channel, proxyclose=True):
self.channel = channel
+ self._proxyclose = proxyclose
def write(self, out):
if self.channel.isclosed():
- return
+ return
+ # XXX: raise IOError, "channel %r already closed" %(self.channel,)
self.channel.send(out)
def flush(self):
pass
def close(self):
- self.channel.close()
+ if self._proxyclose:
+ self.channel.close()
def __repr__(self):
state = self.channel.isclosed() and 'closed' or 'open'
- return '<ChannelFile %d %s>' %(self.channel.id, state)
+ return '<ChannelFile %d %s>' %(self.channel.id, state)
Modified: py/dist/py/execnet/gateway.py
==============================================================================
--- py/dist/py/execnet/gateway.py (original)
+++ py/dist/py/execnet/gateway.py Tue Apr 19 13:46:31 2005
@@ -19,7 +19,7 @@
assert Message and ChannelFactory, "Import/Configuration Error"
import os
-debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid() , 'wa')
+debug = open('/tmp/execnet-debug-%d' % os.getpid() , 'wa')
sysex = (KeyboardInterrupt, SystemExit)
Modified: py/dist/py/execnet/message.py
==============================================================================
--- py/dist/py/execnet/message.py (original)
+++ py/dist/py/execnet/message.py Tue Apr 19 13:46:31 2005
@@ -65,7 +65,7 @@
# executes in receiver thread
gateway._stopexec()
for x in gateway.channelfactory.values():
- x._close()
+ x._do_close()
gateway._outgoing.put(self.STOP_RECEIVING())
raise SystemExit
def post_sent(self, gateway, excinfo=None):
@@ -81,37 +81,40 @@
# already. With sockets closing it would raise
# a Transport Not Connected exception
for x in gateway.channelfactory.values():
- x._close()
+ x._do_close()
raise SystemExit
def post_sent(self, gateway, excinfo=None):
+ # after we sent STOP_RECEIVING we don't
+ # want to write anything more anymore.
gateway.io.close_write()
raise SystemExit
class CHANNEL_OPEN(Message):
def received(self, gateway):
channel = gateway.channelfactory.new(self.channelid)
- gateway._scheduleexec(channel, self.data)
+ channel._do_scheduleexec(self.data)
class CHANNEL_NEW(Message):
def received(self, gateway):
newid = self.data
channel = gateway.channelfactory[self.channelid]
- channel._receivechannel(newid)
+ channel._do_receivechannel(newid)
class CHANNEL_DATA(Message):
def received(self, gateway):
channel = gateway.channelfactory[self.channelid]
- channel._receivedata(self.data)
+ channel._do_receivedata(self.data)
class CHANNEL_CLOSE(Message):
def received(self, gateway):
channel = gateway.channelfactory[self.channelid]
- channel._close()
+ channel._do_close()
class CHANNEL_CLOSE_ERROR(Message):
def received(self, gateway):
channel = gateway.channelfactory[self.channelid]
- channel._close(gateway.RemoteError(self.data))
+ channel._do_close(gateway.RemoteError(self.data))
+
classes = [x for x in locals().values() if hasattr(x, '__bases__')]
classes.sort(lambda x,y : cmp(x.__name__, y.__name__))
i = 0
Modified: py/dist/py/execnet/testing/test_gateway.py
==============================================================================
--- py/dist/py/execnet/testing/test_gateway.py (original)
+++ py/dist/py/execnet/testing/test_gateway.py Tue Apr 19 13:46:31 2005
@@ -77,12 +77,12 @@
class BasicRemoteExecution:
def test_correct_setup(self):
- for x in 'sender', 'receiver': # , 'executor':
+ for x in 'sender', 'receiver':
assert self.gw.pool.getstarted(x)
def test_remote_exec_waitclose(self):
channel = self.gw.remote_exec('pass')
- channel.waitclose(timeout=3.0)
+ channel.waitclose(timeout=1.0)
def test_remote_exec_channel_anonymous(self):
channel = self.gw.remote_exec('''
@@ -111,14 +111,14 @@
assert x == 42
py.test.raises(gateway.RemoteError, channel.receive)
- def test_channel_close(self):
+ def test_channel__do_close(self):
channel = self.gw.channelfactory.new()
- channel._close()
+ channel._do_close()
channel.waitclose(0.1)
- def test_channel_close_error(self):
+ def test_channel__do_close_error(self):
channel = self.gw.channelfactory.new()
- channel._close(gateway.RemoteError("error"))
+ channel._do_close(gateway.RemoteError("error"))
py.test.raises(gateway.RemoteError, channel.waitclose, 0.01)
def test_channel_iter(self):
@@ -206,6 +206,17 @@
s = subl[0]
assert s.strip() == str(i)
+ def future_test_channel_file(self):
+ self.gw.remote_exec("""
+ f = channel.makefile()
+ print >>f, "hello world"
+ f.close()
+ channel.send(42)
+ """)
+ for x in channel.receive():
+ print x
+ assert 0
+
class TestBasicPopenGateway(PopenGatewayTestSetup, BasicRemoteExecution):
#disabled = True
def test_many_popen(self):
Modified: py/dist/py/test/terminal/remote.py
==============================================================================
--- py/dist/py/test/terminal/remote.py (original)
+++ py/dist/py/test/terminal/remote.py Tue Apr 19 13:46:31 2005
@@ -1,6 +1,5 @@
from __future__ import generators
import py
-from py.__impl__.execnet.channel import ChannelFile
from py.__impl__.test.terminal.out import getout
import sys
More information about the pytest-commit
mailing list