[py-svn] r17658 - in py/branch/monday/py/execnet: . testing
hpk at codespeak.net
hpk at codespeak.net
Mon Sep 19 15:50:58 CEST 2005
Author: hpk
Date: Mon Sep 19 15:50:57 2005
New Revision: 17658
Modified:
py/branch/monday/py/execnet/channel.py
py/branch/monday/py/execnet/gateway.py
py/branch/monday/py/execnet/testing/test_gateway.py
Log:
(arigo,hpk,jan)
- removed channel argument from gateway.remote_exec
- removed receiver arg from newchannel()
- introduced new channel.setcallback(callback)
that sends all received items to the callback
Modified: py/branch/monday/py/execnet/channel.py
==============================================================================
--- py/branch/monday/py/execnet/channel.py (original)
+++ py/branch/monday/py/execnet/channel.py Mon Sep 19 15:50:57 2005
@@ -24,18 +24,44 @@
"""Communication channel between two possibly remote threads of code. """
RemoteError = RemoteError
- def __init__(self, gateway, id, has_callback=False):
+ def __init__(self, gateway, id):
assert isinstance(id, int)
self.gateway = gateway
self.id = id
- if has_callback:
- self._items = None
- else:
- self._items = Queue.Queue()
+ self._items = Queue.Queue()
self._closed = False
self._receiveclosed = threading.Event()
self._remoteerrors = []
+ def setcallback(self, callback):
+ queue = self._items
+ lock = self.gateway.channelfactory._receivelock
+ lock.acquire()
+ try:
+ _callbacks = self.gateway.channelfactory._callbacks
+ if _callbacks.setdefault(self.id, callback) is not callback:
+ raise IOError("%r has callback already registered" %(self,))
+ self._items = None
+ while 1:
+ try:
+ olditem = queue.get(block=False)
+ except Queue.Empty:
+ break
+ else:
+ if olditem is ENDMARKER:
+ queue.put(olditem)
+ break
+ else:
+ callback(olditem)
+ if self._closed or self._receiveclosed.isSet():
+ # no need to keep a callback
+ try:
+ del _callbacks[self.id]
+ except KeyError:
+ pass
+ finally:
+ lock.release()
+
def __repr__(self):
flag = self.isclosed() and "closed" or "open"
return "<Channel id=%d %s>" % (self.id, flag)
@@ -100,8 +126,9 @@
self._remoteerrors.append(error)
self._closed = True # --> "closed"
self._receiveclosed.set()
- if self._items is not None:
- self._items.put(ENDMARKER)
+ queue = self._items
+ if queue is not None:
+ queue.put(ENDMARKER)
self.gateway.channelfactory._no_longer_opened(self.id)
def waitclose(self, timeout=None):
@@ -139,11 +166,12 @@
reraised as channel.RemoteError exceptions containing
a textual representation of the remote traceback.
"""
- if self._items is None:
+ queue = self._items
+ if queue is None:
raise IOError("calling receive() on channel with receiver callback")
- x = self._items.get()
+ x = queue.get()
if x is ENDMARKER:
- self._items.put(x) # for other receivers
+ queue.put(x) # for other receivers
raise self._getremoteerror() or EOFError()
else:
return x
@@ -170,20 +198,18 @@
self._channels = weakref.WeakValueDictionary()
self._callbacks = {}
self._writelock = threading.Lock()
+ self._receivelock = threading.RLock()
self.gateway = gateway
self.count = startcount
- def new(self, id=None, receiver=None):
+ def new(self, id=None):
""" create a new Channel with 'id' (or create new id if None). """
self._writelock.acquire()
try:
if id is None:
id = self.count
self.count += 2
- has_callback = receiver is not None
- if has_callback:
- self._callbacks[id] = receiver
- channel = Channel(self.gateway, id, has_callback)
+ channel = Channel(self.gateway, id)
self._channels[id] = channel
return channel
finally:
@@ -217,8 +243,9 @@
channel._remoteerrors.append(remoteerror)
channel._closed = True # --> "closed"
channel._receiveclosed.set()
- if channel._items is not None:
- channel._items.put(ENDMARKER)
+ queue = channel._items
+ if queue is not None:
+ queue.put(ENDMARKER)
self._no_longer_opened(id)
def _local_last_message(self, id):
@@ -229,21 +256,27 @@
else:
# state transition: if "opened", change to "sendonly"
channel._receiveclosed.set()
- if channel._items is not None:
- channel._items.put(ENDMARKER)
+ queue = channel._items
+ if queue is not None:
+ queue.put(ENDMARKER)
self._no_longer_opened(id)
def _local_receive(self, id, data):
# executes in receiver thread
- callback = self._callbacks.get(id)
- if callback is not None:
- callback(data) # even if channel may be already closed
- else:
- channel = self._channels.get(id)
- if channel is None or channel._items is None:
- pass # drop data
+ self._receivelock.acquire()
+ try:
+ callback = self._callbacks.get(id)
+ if callback is not None:
+ callback(data) # even if channel may be already closed
else:
- channel._items.put(data)
+ channel = self._channels.get(id)
+ queue = channel and channel._items
+ if queue is None:
+ pass # drop data
+ else:
+ queue.put(data)
+ finally:
+ self._receivelock.release()
def _finished_receiving(self):
for id in self._channels.keys():
Modified: py/branch/monday/py/execnet/gateway.py
==============================================================================
--- py/branch/monday/py/execnet/gateway.py (original)
+++ py/branch/monday/py/execnet/gateway.py Mon Sep 19 15:50:57 2005
@@ -137,7 +137,7 @@
channel.close()
return close
- def thread_executor(self, channel, (source, outid, errid, autoclose)):
+ def thread_executor(self, channel, (source, outid, errid)):
""" worker thread to execute source objects from the execution queue. """
from sys import exc_info
try:
@@ -159,15 +159,7 @@
channel.close(errortext)
self.trace(errortext)
else:
- if autoclose:
- channel.close()
- else:
- # the channel should usually be closed by Channel.__del__.
- # Give it a better chance now.
- try:
- del loc['channel']
- except KeyError:
- pass
+ channel.close()
def _local_schedulexec(self, channel, sourcetask):
self.trace("dispatching exec")
@@ -179,7 +171,8 @@
if hasattr(callback, 'write'):
callback = callback.write
assert callable(callback)
- chan = self.newchannel(receiver=callback)
+ chan = self.newchannel()
+ chan.setcallback(callback)
return chan.id
# _____________________________________________________________________
@@ -187,18 +180,14 @@
# High Level Interface
# _____________________________________________________________________
#
- def newchannel(self, receiver=None):
- """ return new channel object. If a 'receiver' callback is provided
- it will be invoked on each received item. You cannot call
- receive() anymore on such a channel.
- """
- return self.channelfactory.new(receiver=receiver)
+ def newchannel(self):
+ """ return new channel object. """
+ return self.channelfactory.new()
- def remote_exec(self, source, stdout=None, stderr=None, channel=None):
+ def remote_exec(self, source, stdout=None, stderr=None):
""" return channel object for communicating with the asynchronously
executing 'source' code which will have a corresponding 'channel'
- object in its executing namespace. If a channel object is not
- provided a new channel will be created.
+ object in its executing namespace.
"""
try:
source = str(Source(source))
@@ -208,26 +197,23 @@
source = str(py.code.Source(source))
except ImportError:
pass
- if channel is None:
- channel = self.newchannel()
- autoclose = True
- else:
- autoclose = False
+ channel = self.newchannel()
outid = self._newredirectchannelid(stdout)
errid = self._newredirectchannelid(stderr)
self._outgoing.put(Message.CHANNEL_OPEN(channel.id,
- (source, outid, errid, autoclose)))
+ (source, outid, errid)))
return channel
def remote_redirect(self, stdout=None, stderr=None):
- """ return a handle representing a redirection of of remote
+ """ return a handle representing a redirection of a remote
end's stdout to a local file object. with handle.close()
the redirection will be reverted.
"""
clist = []
for name, out in ('stdout', stdout), ('stderr', stderr):
if out:
- outchannel = self.newchannel(receiver=getattr(out, 'write', out))
+ outchannel = self.newchannel()
+ outchannel.setcallback(getattr(out, 'write', out))
channel = self.remote_exec("""
import sys
outchannel = channel.receive()
Modified: py/branch/monday/py/execnet/testing/test_gateway.py
==============================================================================
--- py/branch/monday/py/execnet/testing/test_gateway.py (original)
+++ py/branch/monday/py/execnet/testing/test_gateway.py Mon Sep 19 15:50:57 2005
@@ -162,41 +162,76 @@
def test_channel_receiver_callback(self):
l = []
- channel = self.gw.newchannel(receiver=l.append)
- self.gw.remote_exec(channel=channel, source='''
+ #channel = self.gw.newchannel(receiver=l.append)
+ channel = self.gw.remote_exec(source='''
channel.send(42)
channel.send(13)
channel.send(channel.gateway.newchannel())
''')
+ channel.setcallback(callback=l.append)
+ py.test.raises(IOError, channel.receive)
channel.waitclose(1.0)
assert len(l) == 3
assert l[:2] == [42,13]
assert isinstance(l[2], channel.__class__)
+ def test_channel_callback_after_receive(self):
+ l = []
+ channel = self.gw.remote_exec(source='''
+ channel.send(42)
+ channel.send(13)
+ channel.send(channel.gateway.newchannel())
+ ''')
+ x = channel.receive()
+ assert x == 42
+ channel.setcallback(callback=l.append)
+ py.test.raises(IOError, channel.receive)
+ channel.waitclose(1.0)
+ assert len(l) == 2
+ assert l[0] == 13
+ assert isinstance(l[1], channel.__class__)
+
+ def test_waiting_for_callbacks(self):
+ l = []
+ def callback(msg):
+ import time; time.sleep(0.2)
+ l.append(msg)
+ channel = self.gw.remote_exec(source='''
+ channel.send(42)
+ ''')
+ channel.setcallback(callback)
+ channel.waitclose(1.0)
+ assert l == [42]
+
def test_channel_callback_stays_active(self, earlyfree=True):
# with 'earlyfree==True', this tests the "sendonly" channel state.
l = []
- channel = self.gw.newchannel(receiver=l.append)
- self.gw.remote_exec(channel=channel, source='''
+ channel = self.gw.remote_exec(source='''
import thread, time
- def producer(channel):
+ def producer(subchannel):
for i in range(5):
time.sleep(0.15)
- channel.send(i*100)
- thread.start_new_thread(producer, (channel,))
+ subchannel.send(i*100)
+ channel2 = channel.receive()
+ thread.start_new_thread(producer, (channel2,))
+ del channel2
''')
+ subchannel = self.gw.newchannel()
+ subchannel.setcallback(l.append)
+ channel.send(subchannel)
if earlyfree:
- channel = None
+ subchannel = None
counter = 100
while len(l) < 5:
- if channel and channel.isclosed():
+ if subchannel and subchannel.isclosed():
break
counter -= 1
+ print counter
if not counter:
- py.test.fail("timed out waiting for the answer[%d]" % i)
+ py.test.fail("timed out waiting for the answer[%d]" % len(l))
time.sleep(0.04) # busy-wait
assert l == [0, 100, 200, 300, 400]
- return channel
+ return subchannel
def test_channel_callback_remote_freed(self):
channel = self.test_channel_callback_stays_active(False)
More information about the pytest-commit
mailing list