[py-svn] r19696 - in py/dist/py: code code/testing execnet execnet/testing test/terminal test/tkinter thread thread/testing
jan at codespeak.net
jan at codespeak.net
Thu Nov 10 02:06:39 CET 2005
Author: jan
Date: Thu Nov 10 02:06:34 2005
New Revision: 19696
Modified:
py/dist/py/code/testing/test_excinfo.py
py/dist/py/code/traceback2.py
py/dist/py/execnet/channel.py
py/dist/py/execnet/gateway.py
py/dist/py/execnet/testing/test_gateway.py
py/dist/py/test/terminal/remote.py
py/dist/py/test/terminal/terminal.py
py/dist/py/test/tkinter/backend.py
py/dist/py/thread/pool.py
py/dist/py/thread/testing/test_pool.py
Log:
Merged monday branch (-r17654:17687) into dist.
Important changes:
- removed channel argument from gateway.remote_exec
- removed receiver arg from newchannel()
- introduced new channel.setcallback(callback)
that sends all received items to the callback
Modified: py/dist/py/code/testing/test_excinfo.py
==============================================================================
--- py/dist/py/code/testing/test_excinfo.py (original)
+++ py/dist/py/code/testing/test_excinfo.py Thu Nov 10 02:06:34 2005
@@ -30,11 +30,18 @@
# testchain for getentries test below
def f():
+ #
raise ValueError
+ #
def g():
+ #
+ __tracebackhide__ = True
f()
+ #
def h():
+ #
g()
+ #
class TestTraceback_f_g_h:
def setup_method(self, method):
@@ -80,6 +87,66 @@
assert s.startswith("def xyz():\n try:")
assert s.endswith("except somenoname:")
+ def test_traceback_cut(self):
+ co = py.code.Code(f)
+ path, firstlineno = co.path, co.firstlineno
+ traceback = self.excinfo.traceback
+ newtraceback = traceback.cut(path=path, firstlineno=firstlineno)
+ assert len(newtraceback) == 1
+ newtraceback = traceback.cut(path=path, lineno=firstlineno+2)
+ assert len(newtraceback) == 1
+
+ def test_traceback_filter(self):
+ traceback = self.excinfo.traceback
+ ntraceback = traceback.filter()
+ assert len(ntraceback) == len(traceback) - 1
+
+ def test_traceback_recursion_index(self):
+ def f(n):
+ if n < 10:
+ n += 1
+ f(n)
+ excinfo = py.test.raises(RuntimeError, f, 8)
+ traceback = excinfo.traceback
+ recindex = traceback.recursionindex()
+ assert recindex == 3
+
+ def test_traceback_getcrashentry(self):
+ def i():
+ __tracebackhide__ = True
+ raise ValueError
+ def h():
+ i()
+ def g():
+ __tracebackhide__ = True
+ h()
+ def f():
+ g()
+
+ excinfo = py.test.raises(ValueError, f)
+ tb = excinfo.traceback
+ entry = tb.getcrashentry()
+ co = py.code.Code(h)
+ assert entry.frame.code.path == co.path
+ assert entry.lineno == co.firstlineno + 1
+ assert entry.frame.code.name == 'h'
+
+ def test_traceback_getcrashentry_empty(self):
+ def g():
+ __tracebackhide__ = True
+ raise ValueError
+ def f():
+ __tracebackhide__ = True
+ g()
+
+ excinfo = py.test.raises(ValueError, f)
+ tb = excinfo.traceback
+ entry = tb.getcrashentry()
+ co = py.code.Code(g)
+ assert entry.frame.code.path == co.path
+ assert entry.lineno == co.firstlineno + 2
+ assert entry.frame.code.name == 'g'
+
#def test_traceback_display_func(self):
# tb = self.excinfo.traceback
# for x in tb:
Modified: py/dist/py/code/traceback2.py
==============================================================================
--- py/dist/py/code/traceback2.py (original)
+++ py/dist/py/code/traceback2.py Thu Nov 10 02:06:34 2005
@@ -52,6 +52,14 @@
break
return source[start:end]
+ def ishidden(self):
+ try:
+ return self.frame.eval("__tracebackhide__")
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except:
+ return False
+
def __str__(self):
try:
fn = str(self.path)
@@ -65,12 +73,58 @@
Entry = TracebackEntry
def __init__(self, tb):
- def f(cur):
- while cur is not None:
- yield self.Entry(cur)
- cur = cur.tb_next
- list.__init__(self, f(tb))
+ if hasattr(tb, 'tb_next'):
+ def f(cur):
+ while cur is not None:
+ yield self.Entry(cur)
+ cur = cur.tb_next
+ list.__init__(self, f(tb))
+ else:
+ list.__init__(self, tb)
+
+ def cut(self, path=None, lineno=None, firstlineno=None):
+ for x in self:
+ if ((path is None or x.frame.code.path == path) and
+ (lineno is None or x.lineno == lineno) and
+ (firstlineno is None or x.frame.code.firstlineno == firstlineno)):
+ return Traceback(x._rawentry)
+ return self
+
+ def __getitem__(self, key):
+ val = super(Traceback, self).__getitem__(key)
+ if isinstance(key, slice):
+ val = self.__class__(val)
+ return val
+ def filter(self, fn=lambda x: not x.ishidden()):
+ return Traceback(filter(fn, self))
+
+ def getcrashentry(self):
+ """ return last non-hidden traceback entry that lead
+ to the exception of a traceback.
+ """
+ tb = self.filter()
+ if not tb:
+ tb = self
+ return tb[-1]
+
+ def recursionindex(self):
+ cache = {}
+ for i, entry in py.builtin.enumerate(self):
+ key = entry.frame.code.path, entry.frame.lineno
+ #print "checking for recursion at", key
+ l = cache.setdefault(key, [])
+ if l:
+ f = entry.frame
+ loc = f.f_locals
+ for otherloc in l:
+ if f.is_true(f.eval(co_equal,
+ __recursioncache_locals_1=loc,
+ __recursioncache_locals_2=otherloc)):
+ return i
+ l.append(entry.frame.f_locals)
+ return None
+
# def __str__(self):
# for x in self
# l = []
@@ -78,3 +132,6 @@
# l.append(entry.display())
# return "".join(l)
+
+co_equal = compile('__recursioncache_locals_1 == __recursioncache_locals_2',
+ '?', 'eval')
Modified: py/dist/py/execnet/channel.py
==============================================================================
--- py/dist/py/execnet/channel.py (original)
+++ py/dist/py/execnet/channel.py Thu Nov 10 02:06:34 2005
@@ -24,18 +24,44 @@
"""Communication channel between two possibly remote threads of code. """
RemoteError = RemoteError
- def __init__(self, gateway, id, has_callback=False):
+ def __init__(self, gateway, id):
assert isinstance(id, int)
self.gateway = gateway
self.id = id
- if has_callback:
- self._items = None
- else:
- self._items = Queue.Queue()
+ self._items = Queue.Queue()
self._closed = False
self._receiveclosed = threading.Event()
self._remoteerrors = []
+ def setcallback(self, callback):
+ queue = self._items
+ lock = self.gateway.channelfactory._receivelock
+ lock.acquire()
+ try:
+ _callbacks = self.gateway.channelfactory._callbacks
+ if _callbacks.setdefault(self.id, callback) is not callback:
+ raise IOError("%r has callback already registered" %(self,))
+ self._items = None
+ while 1:
+ try:
+ olditem = queue.get(block=False)
+ except Queue.Empty:
+ break
+ else:
+ if olditem is ENDMARKER:
+ queue.put(olditem)
+ break
+ else:
+ callback(olditem)
+ if self._closed or self._receiveclosed.isSet():
+ # no need to keep a callback
+ try:
+ del _callbacks[self.id]
+ except KeyError:
+ pass
+ finally:
+ lock.release()
+
def __repr__(self):
flag = self.isclosed() and "closed" or "open"
return "<Channel id=%d %s>" % (self.id, flag)
@@ -100,8 +126,9 @@
self._remoteerrors.append(error)
self._closed = True # --> "closed"
self._receiveclosed.set()
- if self._items is not None:
- self._items.put(ENDMARKER)
+ queue = self._items
+ if queue is not None:
+ queue.put(ENDMARKER)
self.gateway.channelfactory._no_longer_opened(self.id)
def waitclose(self, timeout=None):
@@ -139,11 +166,12 @@
reraised as channel.RemoteError exceptions containing
a textual representation of the remote traceback.
"""
- if self._items is None:
+ queue = self._items
+ if queue is None:
raise IOError("calling receive() on channel with receiver callback")
- x = self._items.get()
+ x = queue.get()
if x is ENDMARKER:
- self._items.put(x) # for other receivers
+ queue.put(x) # for other receivers
raise self._getremoteerror() or EOFError()
else:
return x
@@ -170,20 +198,18 @@
self._channels = weakref.WeakValueDictionary()
self._callbacks = {}
self._writelock = threading.Lock()
+ self._receivelock = threading.RLock()
self.gateway = gateway
self.count = startcount
- def new(self, id=None, receiver=None):
+ def new(self, id=None):
""" create a new Channel with 'id' (or create new id if None). """
self._writelock.acquire()
try:
if id is None:
id = self.count
self.count += 2
- has_callback = receiver is not None
- if has_callback:
- self._callbacks[id] = receiver
- channel = Channel(self.gateway, id, has_callback)
+ channel = Channel(self.gateway, id)
self._channels[id] = channel
return channel
finally:
@@ -217,8 +243,9 @@
channel._remoteerrors.append(remoteerror)
channel._closed = True # --> "closed"
channel._receiveclosed.set()
- if channel._items is not None:
- channel._items.put(ENDMARKER)
+ queue = channel._items
+ if queue is not None:
+ queue.put(ENDMARKER)
self._no_longer_opened(id)
def _local_last_message(self, id):
@@ -229,21 +256,27 @@
else:
# state transition: if "opened", change to "sendonly"
channel._receiveclosed.set()
- if channel._items is not None:
- channel._items.put(ENDMARKER)
+ queue = channel._items
+ if queue is not None:
+ queue.put(ENDMARKER)
self._no_longer_opened(id)
def _local_receive(self, id, data):
# executes in receiver thread
- callback = self._callbacks.get(id)
- if callback is not None:
- callback(data) # even if channel may be already closed
- else:
- channel = self._channels.get(id)
- if channel is None or channel._items is None:
- pass # drop data
+ self._receivelock.acquire()
+ try:
+ callback = self._callbacks.get(id)
+ if callback is not None:
+ callback(data) # even if channel may be already closed
else:
- channel._items.put(data)
+ channel = self._channels.get(id)
+ queue = channel and channel._items
+ if queue is None:
+ pass # drop data
+ else:
+ queue.put(data)
+ finally:
+ self._receivelock.release()
def _finished_receiving(self):
for id in self._channels.keys():
Modified: py/dist/py/execnet/gateway.py
==============================================================================
--- py/dist/py/execnet/gateway.py (original)
+++ py/dist/py/execnet/gateway.py Thu Nov 10 02:06:34 2005
@@ -15,7 +15,7 @@
# to the other side. Yes, it is fragile but we have a
# few tests that try to catch when we mess up.
-# XXX the following line should not be here
+# XXX the following lines should not be here
if 'ThreadOut' not in globals():
import py
from py.code import Source
@@ -50,11 +50,13 @@
sender = self.thread_sender)
def __repr__(self):
- R = len(self.pool.getstarted('receiver')) and "receiving" or "not receiving"
- S = len(self.pool.getstarted('sender')) and "sending" or "not sending"
+ r = (len(self.pool.getstarted('receiver'))
+ and "receiving" or "not receiving")
+ s = (len(self.pool.getstarted('sender'))
+ and "sending" or "not sending")
i = len(self.channelfactory.channels())
- return "<%s %s/%s (%d active channels)>" %(self.__class__.__name__,
- R, S, i)
+ return "<%s %s/%s (%d active channels)>" %(
+ self.__class__.__name__, r, s, i)
## def _local_trystopexec(self):
## self._execpool.shutdown()
@@ -71,6 +73,7 @@
raise
except:
traceback.print_exc()
+
def traceex(self, excinfo):
try:
l = traceback.format_exception(*excinfo)
@@ -137,7 +140,7 @@
channel.close()
return close
- def thread_executor(self, channel, (source, outid, errid, autoclose)):
+ def thread_executor(self, channel, (source, outid, errid)):
""" worker thread to execute source objects from the execution queue. """
from sys import exc_info
try:
@@ -159,15 +162,7 @@
channel.close(errortext)
self.trace(errortext)
else:
- if autoclose:
- channel.close()
- else:
- # the channel should usually be closed by Channel.__del__.
- # Give it a better chance now.
- try:
- del loc['channel']
- except KeyError:
- pass
+ channel.close()
def _local_schedulexec(self, channel, sourcetask):
self.trace("dispatching exec")
@@ -179,7 +174,8 @@
if hasattr(callback, 'write'):
callback = callback.write
assert callable(callback)
- chan = self.newchannel(receiver=callback)
+ chan = self.newchannel()
+ chan.setcallback(callback)
return chan.id
# _____________________________________________________________________
@@ -187,18 +183,14 @@
# High Level Interface
# _____________________________________________________________________
#
- def newchannel(self, receiver=None):
- """ return new channel object. If a 'receiver' callback is provided
- it will be invoked on each received item. You cannot call
- receive() anymore on such a channel.
- """
- return self.channelfactory.new(receiver=receiver)
+ def newchannel(self):
+ """ return new channel object. """
+ return self.channelfactory.new()
- def remote_exec(self, source, stdout=None, stderr=None, channel=None):
+ def remote_exec(self, source, stdout=None, stderr=None):
""" return channel object for communicating with the asynchronously
executing 'source' code which will have a corresponding 'channel'
- object in its executing namespace. If a channel object is not
- provided a new channel will be created.
+ object in its executing namespace.
"""
try:
source = str(Source(source))
@@ -208,26 +200,23 @@
source = str(py.code.Source(source))
except ImportError:
pass
- if channel is None:
- channel = self.newchannel()
- autoclose = True
- else:
- autoclose = False
+ channel = self.newchannel()
outid = self._newredirectchannelid(stdout)
errid = self._newredirectchannelid(stderr)
self._outgoing.put(Message.CHANNEL_OPEN(channel.id,
- (source, outid, errid, autoclose)))
+ (source, outid, errid)))
return channel
def remote_redirect(self, stdout=None, stderr=None):
- """ return a handle representing a redirection of of remote
+ """ return a handle representing a redirection of a remote
end's stdout to a local file object. with handle.close()
the redirection will be reverted.
"""
clist = []
for name, out in ('stdout', stdout), ('stderr', stderr):
if out:
- outchannel = self.newchannel(receiver=getattr(out, 'write', out))
+ outchannel = self.newchannel()
+ outchannel.setcallback(getattr(out, 'write', out))
channel = self.remote_exec("""
import sys
outchannel = channel.receive()
Modified: py/dist/py/execnet/testing/test_gateway.py
==============================================================================
--- py/dist/py/execnet/testing/test_gateway.py (original)
+++ py/dist/py/execnet/testing/test_gateway.py Thu Nov 10 02:06:34 2005
@@ -69,6 +69,9 @@
for x in 'sender', 'receiver':
assert self.gw.pool.getstarted(x)
+ def test_repr_doesnt_crash(self):
+ assert isinstance(repr(self), str)
+
def test_correct_setup_no_py(self):
channel = self.gw.remote_exec("""
import sys
@@ -162,41 +165,76 @@
def test_channel_receiver_callback(self):
l = []
- channel = self.gw.newchannel(receiver=l.append)
- self.gw.remote_exec(channel=channel, source='''
+ #channel = self.gw.newchannel(receiver=l.append)
+ channel = self.gw.remote_exec(source='''
channel.send(42)
channel.send(13)
channel.send(channel.gateway.newchannel())
''')
+ channel.setcallback(callback=l.append)
+ py.test.raises(IOError, channel.receive)
channel.waitclose(1.0)
assert len(l) == 3
assert l[:2] == [42,13]
assert isinstance(l[2], channel.__class__)
+ def test_channel_callback_after_receive(self):
+ l = []
+ channel = self.gw.remote_exec(source='''
+ channel.send(42)
+ channel.send(13)
+ channel.send(channel.gateway.newchannel())
+ ''')
+ x = channel.receive()
+ assert x == 42
+ channel.setcallback(callback=l.append)
+ py.test.raises(IOError, channel.receive)
+ channel.waitclose(1.0)
+ assert len(l) == 2
+ assert l[0] == 13
+ assert isinstance(l[1], channel.__class__)
+
+ def test_waiting_for_callbacks(self):
+ l = []
+ def callback(msg):
+ import time; time.sleep(0.2)
+ l.append(msg)
+ channel = self.gw.remote_exec(source='''
+ channel.send(42)
+ ''')
+ channel.setcallback(callback)
+ channel.waitclose(1.0)
+ assert l == [42]
+
def test_channel_callback_stays_active(self, earlyfree=True):
# with 'earlyfree==True', this tests the "sendonly" channel state.
l = []
- channel = self.gw.newchannel(receiver=l.append)
- self.gw.remote_exec(channel=channel, source='''
+ channel = self.gw.remote_exec(source='''
import thread, time
- def producer(channel):
+ def producer(subchannel):
for i in range(5):
time.sleep(0.15)
- channel.send(i*100)
- thread.start_new_thread(producer, (channel,))
+ subchannel.send(i*100)
+ channel2 = channel.receive()
+ thread.start_new_thread(producer, (channel2,))
+ del channel2
''')
+ subchannel = self.gw.newchannel()
+ subchannel.setcallback(l.append)
+ channel.send(subchannel)
if earlyfree:
- channel = None
+ subchannel = None
counter = 100
while len(l) < 5:
- if channel and channel.isclosed():
+ if subchannel and subchannel.isclosed():
break
counter -= 1
+ print counter
if not counter:
- py.test.fail("timed out waiting for the answer[%d]" % i)
+ py.test.fail("timed out waiting for the answer[%d]" % len(l))
time.sleep(0.04) # busy-wait
assert l == [0, 100, 200, 300, 400]
- return channel
+ return subchannel
def test_channel_callback_remote_freed(self):
channel = self.test_channel_callback_stays_active(False)
Modified: py/dist/py/test/terminal/remote.py
==============================================================================
--- py/dist/py/test/terminal/remote.py (original)
+++ py/dist/py/test/terminal/remote.py Thu Nov 10 02:06:34 2005
@@ -21,8 +21,8 @@
print "# WARN: race condition on", path
else:
if oldstat:
- if oldstat.st_mtime != curstat.st_mtime or \
- oldstat.st_size != curstat.st_size:
+ if oldstat.mtime != curstat.mtime or \
+ oldstat.size != curstat.size:
changed = True
print "# MODIFIED", path
else:
Modified: py/dist/py/test/terminal/terminal.py
==============================================================================
--- py/dist/py/test/terminal/terminal.py (original)
+++ py/dist/py/test/terminal/terminal.py Thu Nov 10 02:06:34 2005
@@ -255,26 +255,34 @@
#print "repr_failures sees item", item
#print "repr_failures sees traceback"
#py.std.pprint.pprint(traceback)
- if item:
- self.cut_traceback(traceback, item)
+ if item and not self.config.option.fulltrace:
+ path, firstlineno = item.getpathlineno()
+ ntraceback = traceback.cut(path=path, firstlineno=firstlineno)
+ if ntraceback == traceback:
+ ntraceback = ntraceback.cut(path=path)
+ traceback = ntraceback.filter()
if not traceback:
self.out.line("empty traceback from item %r" % (item,))
return
last = traceback[-1]
first = traceback[0]
- recursioncache = {}
- for entry in traceback:
+ if not self.config.option.nomagic and excinfo.errisinstance(RuntimeError):
+ recursionindex = traceback.recursionindex()
+ else:
+ recursionindex = None
+ for index, entry in py.builtin.enumerate(traceback):
if entry == first:
if item:
self.repr_failure_info(item, entry)
self.out.line()
else:
self.out.line("")
+ source = self.getentrysource(entry)
if entry == last:
- indent = self.repr_source(entry, 'E')
- self.repr_failure_explanation(excinfo, indent)
+ self.repr_source(source, 'E')
+ self.repr_failure_explanation(excinfo, source)
else:
- self.repr_source(entry, '>')
+ self.repr_source(source, '>')
self.out.line("")
self.out.line("[%s:%d]" %(entry.frame.code.path, entry.lineno+1))
self.repr_locals(entry)
@@ -287,27 +295,11 @@
self.out.sep("_")
else:
self.out.sep("_ ")
- if not self.config.option.nomagic and excinfo.errisinstance(RuntimeError) \
- and self.isrecursive(entry, recursioncache):
+ if index == recursionindex:
self.out.line("Recursion detected (same locals & position)")
self.out.sep("!")
break
- def isrecursive(self, entry, recursioncache):
- # recursion detection
- key = entry.frame.code.path, entry.frame.lineno
- #print "checking for recursion at", key
- l = recursioncache.setdefault(key, [])
- if l:
- f = entry.frame
- loc = f.f_locals
- for otherloc in l:
- if f.is_true(f.eval(co_equal,
- __recursioncache_locals_1=loc,
- __recursioncache_locals_2=otherloc)):
- return True
- l.append(entry.frame.f_locals)
-
def repr_failure_info(self, item, entry):
root = item.fspath
modpath = item.getmodpath()
@@ -326,62 +318,27 @@
else:
self.out.sep("_", "entrypoint: %s %s" %(root.basename, modpath))
- def repr_source(self, entry, marker=">"):
+ def getentrysource(self, entry):
try:
source = entry.getsource()
except py.error.ENOENT:
- self.out.line("[failure to get at sourcelines from %r]\n" % entry)
- else:
- source = source.deindent()
- for line in source[:-1]:
- self.out.line(" " + line)
- lastline = source[-1]
- self.out.line(marker + " " + lastline)
- try:
- s = str(source.getstatement(len(source)-1))
- except KeyboardInterrupt:
- raise
- except:
- #self.out.line("[failed to get last statement]\n%s" %(source,))
- s = str(source[-1])
- #print "XXX %r" % s
- return 4 + (len(s) - len(s.lstrip()))
- return 0
-
- def cut_traceback(self, traceback, item=None):
- if self.config.option.fulltrace or item is None:
- return
- newtraceback = traceback[:]
- path, lineno = item.getpathlineno()
- for i, entry in py.builtin.enumerate(newtraceback):
- if entry.frame.code.path == path:
- last = i
- while i < len(newtraceback)-1:
- entry = newtraceback[i]
- next = newtraceback[i+1]
- if next.frame.code.path != path:
- break
- if entry.frame.code.firstlineno == lineno:
- break
- del newtraceback[:i]
- break
- if not newtraceback:
- newtraceback = traceback[:]
-
- # get rid of all frames marked with __tracebackhide__
- l = []
- for entry in newtraceback:
- try:
- x = entry.frame.eval("__tracebackhide__")
- except:
- x = None
- if not x:
- l.append(entry)
- traceback[:] = l
+ source = py.code.Source("[failure to get at sourcelines from %r]\n" % entry)
+ return source.deindent()
- def repr_failure_explanation(self, excinfo, indent):
+ def repr_source(self, source, marker=">"):
+ for line in source[:-1]:
+ self.out.line(" " + line)
+ lastline = source[-1]
+ self.out.line(marker + " " + lastline)
- indent = " " * indent
+ def repr_failure_explanation(self, excinfo, source):
+ try:
+ s = str(source.getstatement(len(source)-1))
+ except KeyboardInterrupt:
+ raise
+ except:
+ s = str(source[-1])
+ indent = " " * (4 + (len(s) - len(s.lstrip())))
# get the real exception information out
lines = excinfo.exconly(tryshort=True).split('\n')
self.out.line('>' + indent[:-1] + lines.pop(0))
@@ -435,9 +392,6 @@
self.out.line("%-10s =\\" % (name,))
py.std.pprint.pprint(value, stream=self.out)
-co_equal = compile('__recursioncache_locals_1 == __recursioncache_locals_2',
- '?', 'eval')
-
def repr_pythonversion():
v = py.std.sys.version_info
try:
Modified: py/dist/py/test/tkinter/backend.py
==============================================================================
--- py/dist/py/test/tkinter/backend.py (original)
+++ py/dist/py/test/tkinter/backend.py Thu Nov 10 02:06:34 2005
@@ -191,8 +191,8 @@
self.testrepository = TestRepository()
self.reportstore = ReportStore()
self.gateway = py.execnet.PopenGateway(config.option.executable)
- self.channel = self.gateway.newchannel(receiver = self.queue.put)
- self.gateway.remote_exec(channel = self.channel, source = '''
+ #self.channel = self.gateway.newchannel(receiver = self.queue.put)
+ self.channel = self.gateway.remote_exec(source = '''
import py
from py.__.test.tkinter.backend import remote
@@ -201,6 +201,7 @@
# why?
channel.close()
''')
+ self.channel.setcallback(self.queue.put)
self.channel.send((args, tests))
self.waitfinish_thread = threading.Thread(target = waitfinish, args = (self.channel,))
self.waitfinish_thread.start()
Modified: py/dist/py/thread/pool.py
==============================================================================
--- py/dist/py/thread/pool.py (original)
+++ py/dist/py/thread/pool.py Thu Nov 10 02:06:34 2005
@@ -3,18 +3,20 @@
import time
import sys
+ERRORMARKER = object()
+
class Reply(object):
_excinfo = None
def __init__(self, task):
self.task = task
self._queue = Queue.Queue()
- def set(self, result):
+ def _set(self, result):
self._queue.put(result)
- def setexcinfo(self, excinfo):
+ def _setexcinfo(self, excinfo):
self._excinfo = excinfo
- self._queue.put(None)
+ self._queue.put(ERRORMARKER)
def _get_with_timeout(self, timeout):
# taken from python2.3's Queue.get()
@@ -32,13 +34,15 @@
time.sleep(delay) #reduce CPU usage by using a sleep
def get(self, timeout=None):
+ if self._queue is None:
+ raise EOFError("reply has already been delivered")
if timeout is not None:
result = self._get_with_timeout(timeout)
else:
result = self._queue.get()
- excinfo = self._excinfo
- if excinfo:
- self._excinfo = None
+ if result is ERRORMARKER:
+ self._queue = None
+ excinfo = self._excinfo
raise excinfo[0], excinfo[1], excinfo[2]
return result
@@ -61,9 +65,9 @@
except (SystemExit, KeyboardInterrupt):
return False
except:
- reply.setexcinfo(sys.exc_info())
+ reply._setexcinfo(sys.exc_info())
else:
- reply.set(result)
+ reply._set(result)
# at this point, reply, task and all other local variables go away
return True
Modified: py/dist/py/thread/testing/test_pool.py
==============================================================================
--- py/dist/py/thread/testing/test_pool.py (original)
+++ py/dist/py/thread/testing/test_pool.py Thu Nov 10 02:06:34 2005
@@ -46,6 +46,7 @@
raise ValueError("42")
reply = pool.dispatch(f)
excinfo = py.test.raises(ValueError, "reply.get(1.0)")
+ py.test.raises(EOFError, "reply.get(1.0)")
def test_maxthreads():
pool = WorkerPool(maxthreads=1)
More information about the pytest-commit
mailing list