[py-svn] r19696 - in py/dist/py: code code/testing execnet execnet/testing test/terminal test/tkinter thread thread/testing

jan at codespeak.net jan at codespeak.net
Thu Nov 10 02:06:39 CET 2005


Author: jan
Date: Thu Nov 10 02:06:34 2005
New Revision: 19696

Modified:
   py/dist/py/code/testing/test_excinfo.py
   py/dist/py/code/traceback2.py
   py/dist/py/execnet/channel.py
   py/dist/py/execnet/gateway.py
   py/dist/py/execnet/testing/test_gateway.py
   py/dist/py/test/terminal/remote.py
   py/dist/py/test/terminal/terminal.py
   py/dist/py/test/tkinter/backend.py
   py/dist/py/thread/pool.py
   py/dist/py/thread/testing/test_pool.py
Log:
Merged monday branch (-r17654:17687) into dist.

Important changes:
- removed channel argument from gateway.remote_exec
- removed receiver arg from newchannel()
- introduced new channel.setcallback(callback)
  that sends all received items to the callback



Modified: py/dist/py/code/testing/test_excinfo.py
==============================================================================
--- py/dist/py/code/testing/test_excinfo.py	(original)
+++ py/dist/py/code/testing/test_excinfo.py	Thu Nov 10 02:06:34 2005
@@ -30,11 +30,18 @@
 
 # testchain for getentries test below
 def f():
+    #
     raise ValueError
+    #
 def g():
+    #
+    __tracebackhide__ = True
     f()
+    #
 def h():
+    #
     g()
+    #
 
 class TestTraceback_f_g_h: 
     def setup_method(self, method): 
@@ -80,6 +87,66 @@
             assert s.startswith("def xyz():\n    try:")
             assert s.endswith("except somenoname:") 
 
+    def test_traceback_cut(self):
+        co = py.code.Code(f)
+        path, firstlineno = co.path, co.firstlineno 
+        traceback = self.excinfo.traceback 
+        newtraceback = traceback.cut(path=path, firstlineno=firstlineno)
+        assert len(newtraceback) == 1
+        newtraceback = traceback.cut(path=path, lineno=firstlineno+2)
+        assert len(newtraceback) == 1
+
+    def test_traceback_filter(self):
+        traceback = self.excinfo.traceback
+        ntraceback = traceback.filter()
+        assert len(ntraceback) == len(traceback) - 1
+
+    def test_traceback_recursion_index(self):
+        def f(n):
+            if n < 10:
+                n += 1
+            f(n)
+        excinfo = py.test.raises(RuntimeError, f, 8)
+        traceback = excinfo.traceback 
+        recindex = traceback.recursionindex() 
+        assert recindex == 3
+
+    def test_traceback_getcrashentry(self):
+        def i():
+            __tracebackhide__ = True
+            raise ValueError 
+        def h():
+            i()
+        def g():
+            __tracebackhide__ = True
+            h()
+        def f():
+            g()
+
+        excinfo = py.test.raises(ValueError, f)
+        tb = excinfo.traceback
+        entry = tb.getcrashentry()
+        co = py.code.Code(h)
+        assert entry.frame.code.path == co.path
+        assert entry.lineno == co.firstlineno + 1 
+        assert entry.frame.code.name == 'h'
+
+    def test_traceback_getcrashentry_empty(self):
+        def g():
+            __tracebackhide__ = True
+            raise ValueError 
+        def f():
+            __tracebackhide__ = True
+            g()
+
+        excinfo = py.test.raises(ValueError, f)
+        tb = excinfo.traceback
+        entry = tb.getcrashentry()
+        co = py.code.Code(g)
+        assert entry.frame.code.path == co.path
+        assert entry.lineno == co.firstlineno + 2
+        assert entry.frame.code.name == 'g'
+        
     #def test_traceback_display_func(self):
     #    tb = self.excinfo.traceback 
     #    for x in tb: 

Modified: py/dist/py/code/traceback2.py
==============================================================================
--- py/dist/py/code/traceback2.py	(original)
+++ py/dist/py/code/traceback2.py	Thu Nov 10 02:06:34 2005
@@ -52,6 +52,14 @@
                 break 
         return source[start:end]
 
+    def ishidden(self):
+        try: 
+            return self.frame.eval("__tracebackhide__") 
+        except (SystemExit, KeyboardInterrupt): 
+            raise
+        except:
+            return False 
+
     def __str__(self): 
         try: 
             fn = str(self.path) 
@@ -65,12 +73,58 @@
     Entry = TracebackEntry 
 
     def __init__(self, tb):
-        def f(cur): 
-            while cur is not None: 
-                yield self.Entry(cur)
-                cur = cur.tb_next 
-        list.__init__(self, f(tb)) 
+        if hasattr(tb, 'tb_next'):
+            def f(cur): 
+                while cur is not None: 
+                    yield self.Entry(cur)
+                    cur = cur.tb_next 
+            list.__init__(self, f(tb)) 
+        else:
+            list.__init__(self, tb)
+
+    def cut(self, path=None, lineno=None, firstlineno=None):
+        for x in self:
+            if ((path is None or x.frame.code.path == path) and
+                (lineno is None or x.lineno == lineno) and
+                (firstlineno is None or x.frame.code.firstlineno == firstlineno)):
+                return Traceback(x._rawentry)
+        return self
+
+    def __getitem__(self, key):
+        val = super(Traceback, self).__getitem__(key)
+        if isinstance(key, slice):
+            val = self.__class__(val)
+        return val 
 
+    def filter(self, fn=lambda x: not x.ishidden()):
+        return Traceback(filter(fn, self))
+
+    def getcrashentry(self):
+        """ return last non-hidden traceback entry that lead
+        to the exception of a traceback. 
+        """
+        tb = self.filter()
+        if not tb:
+            tb = self
+        return tb[-1]
+
+    def recursionindex(self):
+        cache = {}
+        for i, entry in py.builtin.enumerate(self):
+            key = entry.frame.code.path, entry.frame.lineno 
+            #print "checking for recursion at", key
+            l = cache.setdefault(key, [])
+            if l: 
+                f = entry.frame
+                loc = f.f_locals
+                for otherloc in l: 
+                    if f.is_true(f.eval(co_equal, 
+                        __recursioncache_locals_1=loc,
+                        __recursioncache_locals_2=otherloc)):
+                        return i 
+            l.append(entry.frame.f_locals)
+        return None
+        
 #    def __str__(self): 
 #        for x in self
 #        l = []
@@ -78,3 +132,6 @@
 #            l.append(entry.display()) 
 #        return "".join(l) 
 
+
+co_equal = compile('__recursioncache_locals_1 == __recursioncache_locals_2',
+                   '?', 'eval')

Modified: py/dist/py/execnet/channel.py
==============================================================================
--- py/dist/py/execnet/channel.py	(original)
+++ py/dist/py/execnet/channel.py	Thu Nov 10 02:06:34 2005
@@ -24,18 +24,44 @@
     """Communication channel between two possibly remote threads of code. """
     RemoteError = RemoteError
 
-    def __init__(self, gateway, id, has_callback=False):
+    def __init__(self, gateway, id):
         assert isinstance(id, int)
         self.gateway = gateway
         self.id = id
-        if has_callback:
-            self._items = None
-        else:
-            self._items = Queue.Queue()
+        self._items = Queue.Queue()
         self._closed = False
         self._receiveclosed = threading.Event()
         self._remoteerrors = []
 
+    def setcallback(self, callback):
+        queue = self._items
+        lock = self.gateway.channelfactory._receivelock
+        lock.acquire()
+        try:
+            _callbacks = self.gateway.channelfactory._callbacks
+            if _callbacks.setdefault(self.id, callback) is not callback:
+                raise IOError("%r has callback already registered" %(self,))
+            self._items = None
+            while 1:
+                try:
+                    olditem = queue.get(block=False)
+                except Queue.Empty:
+                    break
+                else:
+                    if olditem is ENDMARKER:
+                        queue.put(olditem)
+                        break
+                    else:
+                        callback(olditem)
+            if self._closed or self._receiveclosed.isSet():
+                # no need to keep a callback
+                try:
+                    del _callbacks[self.id]
+                except KeyError:
+                    pass
+        finally:
+            lock.release()
+         
     def __repr__(self):
         flag = self.isclosed() and "closed" or "open"
         return "<Channel id=%d %s>" % (self.id, flag)
@@ -100,8 +126,9 @@
                 self._remoteerrors.append(error)
             self._closed = True         # --> "closed"
             self._receiveclosed.set()
-            if self._items is not None:
-                self._items.put(ENDMARKER)
+            queue = self._items
+            if queue is not None:
+                queue.put(ENDMARKER)
             self.gateway.channelfactory._no_longer_opened(self.id)
 
     def waitclose(self, timeout=None):
@@ -139,11 +166,12 @@
         reraised as channel.RemoteError exceptions containing
         a textual representation of the remote traceback.
         """
-        if self._items is None:
+        queue = self._items
+        if queue is None:
             raise IOError("calling receive() on channel with receiver callback")
-        x = self._items.get()
+        x = queue.get()
         if x is ENDMARKER: 
-            self._items.put(x)  # for other receivers 
+            queue.put(x)  # for other receivers 
             raise self._getremoteerror() or EOFError()
         else: 
             return x
@@ -170,20 +198,18 @@
         self._channels = weakref.WeakValueDictionary()
         self._callbacks = {}
         self._writelock = threading.Lock()
+        self._receivelock = threading.RLock()
         self.gateway = gateway
         self.count = startcount
 
-    def new(self, id=None, receiver=None):
+    def new(self, id=None):
         """ create a new Channel with 'id' (or create new id if None). """
         self._writelock.acquire()
         try:
             if id is None:
                 id = self.count
                 self.count += 2
-            has_callback = receiver is not None
-            if has_callback:
-                self._callbacks[id] = receiver
-            channel = Channel(self.gateway, id, has_callback)
+            channel = Channel(self.gateway, id)
             self._channels[id] = channel
             return channel
         finally:
@@ -217,8 +243,9 @@
                 channel._remoteerrors.append(remoteerror)
             channel._closed = True          # --> "closed"
             channel._receiveclosed.set()
-            if channel._items is not None:
-                channel._items.put(ENDMARKER)
+            queue = channel._items
+            if queue is not None:
+                queue.put(ENDMARKER)
         self._no_longer_opened(id)
 
     def _local_last_message(self, id):
@@ -229,21 +256,27 @@
         else:
             # state transition: if "opened", change to "sendonly"
             channel._receiveclosed.set()
-            if channel._items is not None:
-                channel._items.put(ENDMARKER)
+            queue = channel._items
+            if queue is not None:
+                queue.put(ENDMARKER)
         self._no_longer_opened(id)
 
     def _local_receive(self, id, data): 
         # executes in receiver thread
-        callback = self._callbacks.get(id)
-        if callback is not None:
-            callback(data)   # even if channel may be already closed
-        else:
-            channel = self._channels.get(id)
-            if channel is None or channel._items is None:
-                pass    # drop data
+        self._receivelock.acquire()
+        try:
+            callback = self._callbacks.get(id)
+            if callback is not None:
+                callback(data)   # even if channel may be already closed
             else:
-                channel._items.put(data)
+                channel = self._channels.get(id)
+                queue = channel and channel._items
+                if queue is None:
+                    pass    # drop data
+                else:
+                    queue.put(data)
+        finally:
+            self._receivelock.release()
 
     def _finished_receiving(self):
         for id in self._channels.keys():

Modified: py/dist/py/execnet/gateway.py
==============================================================================
--- py/dist/py/execnet/gateway.py	(original)
+++ py/dist/py/execnet/gateway.py	Thu Nov 10 02:06:34 2005
@@ -15,7 +15,7 @@
 # to the other side.  Yes, it is fragile but we have a 
 # few tests that try to catch when we mess up. 
 
-# XXX the following line should not be here
+# XXX the following lines should not be here
 if 'ThreadOut' not in globals(): 
     import py 
     from py.code import Source
@@ -50,11 +50,13 @@
                                     sender = self.thread_sender)
 
     def __repr__(self): 
-        R = len(self.pool.getstarted('receiver')) and "receiving" or "not receiving" 
-        S = len(self.pool.getstarted('sender')) and "sending" or "not sending"
+        r = (len(self.pool.getstarted('receiver'))
+             and "receiving" or "not receiving")
+        s = (len(self.pool.getstarted('sender')) 
+             and "sending" or "not sending")
         i = len(self.channelfactory.channels())
-        return "<%s %s/%s (%d active channels)>" %(self.__class__.__name__, 
-                                                   R, S, i) 
+        return "<%s %s/%s (%d active channels)>" %(
+                self.__class__.__name__, r, s, i) 
 
 ##    def _local_trystopexec(self):
 ##        self._execpool.shutdown() 
@@ -71,6 +73,7 @@
                 raise
             except:
                 traceback.print_exc()
+
     def traceex(self, excinfo):
         try:
             l = traceback.format_exception(*excinfo)
@@ -137,7 +140,7 @@
                 channel.close() 
         return close 
 
-    def thread_executor(self, channel, (source, outid, errid, autoclose)):
+    def thread_executor(self, channel, (source, outid, errid)):
         """ worker thread to execute source objects from the execution queue. """
         from sys import exc_info
         try:
@@ -159,15 +162,7 @@
             channel.close(errortext)
             self.trace(errortext)
         else:
-            if autoclose:
-                channel.close()
-            else:
-                # the channel should usually be closed by Channel.__del__.
-                # Give it a better chance now.
-                try:
-                    del loc['channel']
-                except KeyError:
-                    pass
+            channel.close()
 
     def _local_schedulexec(self, channel, sourcetask): 
         self.trace("dispatching exec")
@@ -179,7 +174,8 @@
         if hasattr(callback, 'write'): 
             callback = callback.write 
         assert callable(callback) 
-        chan = self.newchannel(receiver=callback) 
+        chan = self.newchannel()
+        chan.setcallback(callback)
         return chan.id 
 
     # _____________________________________________________________________
@@ -187,18 +183,14 @@
     # High Level Interface
     # _____________________________________________________________________
     #
-    def newchannel(self, receiver=None): 
-        """ return new channel object. If a 'receiver' callback is provided 
-            it will be invoked on each received item. You cannot call 
-            receive() anymore on such a channel. 
-        """ 
-        return self.channelfactory.new(receiver=receiver) 
+    def newchannel(self): 
+        """ return new channel object.  """ 
+        return self.channelfactory.new()
 
-    def remote_exec(self, source, stdout=None, stderr=None, channel=None): 
+    def remote_exec(self, source, stdout=None, stderr=None): 
         """ return channel object for communicating with the asynchronously
             executing 'source' code which will have a corresponding 'channel'
-            object in its executing namespace. If a channel object is not
-            provided a new channel will be created.  
+            object in its executing namespace. 
         """
         try:
             source = str(Source(source))
@@ -208,26 +200,23 @@
                 source = str(py.code.Source(source))
             except ImportError: 
                 pass 
-        if channel is None: 
-            channel = self.newchannel() 
-            autoclose = True
-        else:
-            autoclose = False
+        channel = self.newchannel() 
         outid = self._newredirectchannelid(stdout) 
         errid = self._newredirectchannelid(stderr) 
         self._outgoing.put(Message.CHANNEL_OPEN(channel.id, 
-                               (source, outid, errid, autoclose)))
+                               (source, outid, errid)))
         return channel 
 
     def remote_redirect(self, stdout=None, stderr=None): 
-        """ return a handle representing a redirection of of remote 
+        """ return a handle representing a redirection of a remote 
             end's stdout to a local file object.  with handle.close() 
             the redirection will be reverted.   
         """ 
         clist = []
         for name, out in ('stdout', stdout), ('stderr', stderr): 
             if out: 
-                outchannel = self.newchannel(receiver=getattr(out, 'write', out))
+                outchannel = self.newchannel()
+                outchannel.setcallback(getattr(out, 'write', out))
                 channel = self.remote_exec(""" 
                     import sys
                     outchannel = channel.receive() 

Modified: py/dist/py/execnet/testing/test_gateway.py
==============================================================================
--- py/dist/py/execnet/testing/test_gateway.py	(original)
+++ py/dist/py/execnet/testing/test_gateway.py	Thu Nov 10 02:06:34 2005
@@ -69,6 +69,9 @@
         for x in 'sender', 'receiver':  
             assert self.gw.pool.getstarted(x) 
 
+    def test_repr_doesnt_crash(self):
+        assert isinstance(repr(self), str)
+
     def test_correct_setup_no_py(self):
         channel = self.gw.remote_exec("""
             import sys
@@ -162,41 +165,76 @@
 
     def test_channel_receiver_callback(self): 
         l = []
-        channel = self.gw.newchannel(receiver=l.append)
-        self.gw.remote_exec(channel=channel, source='''
+        #channel = self.gw.newchannel(receiver=l.append)
+        channel = self.gw.remote_exec(source='''
             channel.send(42)
             channel.send(13)
             channel.send(channel.gateway.newchannel())
             ''') 
+        channel.setcallback(callback=l.append)
+        py.test.raises(IOError, channel.receive)
         channel.waitclose(1.0) 
         assert len(l) == 3
         assert l[:2] == [42,13]
         assert isinstance(l[2], channel.__class__) 
 
+    def test_channel_callback_after_receive(self):
+        l = []
+        channel = self.gw.remote_exec(source='''
+            channel.send(42)
+            channel.send(13)
+            channel.send(channel.gateway.newchannel())
+            ''') 
+        x = channel.receive()
+        assert x == 42
+        channel.setcallback(callback=l.append)
+        py.test.raises(IOError, channel.receive)
+        channel.waitclose(1.0) 
+        assert len(l) == 2
+        assert l[0] == 13
+        assert isinstance(l[1], channel.__class__) 
+
+    def test_waiting_for_callbacks(self):
+        l = []
+        def callback(msg):
+            import time; time.sleep(0.2)
+            l.append(msg)
+        channel = self.gw.remote_exec(source='''
+            channel.send(42)
+            ''')
+        channel.setcallback(callback)
+        channel.waitclose(1.0)
+        assert l == [42]
+
     def test_channel_callback_stays_active(self, earlyfree=True):
         # with 'earlyfree==True', this tests the "sendonly" channel state.
         l = []
-        channel = self.gw.newchannel(receiver=l.append)
-        self.gw.remote_exec(channel=channel, source='''
+        channel = self.gw.remote_exec(source='''
             import thread, time
-            def producer(channel):
+            def producer(subchannel):
                 for i in range(5):
                     time.sleep(0.15)
-                    channel.send(i*100)
-            thread.start_new_thread(producer, (channel,))
+                    subchannel.send(i*100)
+            channel2 = channel.receive()
+            thread.start_new_thread(producer, (channel2,))
+            del channel2
             ''')
+        subchannel = self.gw.newchannel()
+        subchannel.setcallback(l.append)
+        channel.send(subchannel)
         if earlyfree:
-            channel = None
+            subchannel = None
         counter = 100
         while len(l) < 5:
-            if channel and channel.isclosed():
+            if subchannel and subchannel.isclosed():
                 break
             counter -= 1
+            print counter
             if not counter:
-                py.test.fail("timed out waiting for the answer[%d]" % i)
+                py.test.fail("timed out waiting for the answer[%d]" % len(l))
             time.sleep(0.04)   # busy-wait
         assert l == [0, 100, 200, 300, 400]
-        return channel
+        return subchannel
 
     def test_channel_callback_remote_freed(self):
         channel = self.test_channel_callback_stays_active(False)

Modified: py/dist/py/test/terminal/remote.py
==============================================================================
--- py/dist/py/test/terminal/remote.py	(original)
+++ py/dist/py/test/terminal/remote.py	Thu Nov 10 02:06:34 2005
@@ -21,8 +21,8 @@
                 print "# WARN: race condition on", path
         else:
             if oldstat:
-               if oldstat.st_mtime != curstat.st_mtime or \
-                  oldstat.st_size != curstat.st_size:
+               if oldstat.mtime != curstat.mtime or \
+                  oldstat.size != curstat.size:
                     changed = True
                     print "# MODIFIED", path
             else:

Modified: py/dist/py/test/terminal/terminal.py
==============================================================================
--- py/dist/py/test/terminal/terminal.py	(original)
+++ py/dist/py/test/terminal/terminal.py	Thu Nov 10 02:06:34 2005
@@ -255,26 +255,34 @@
         #print "repr_failures sees item", item
         #print "repr_failures sees traceback"
         #py.std.pprint.pprint(traceback)
-        if item: 
-            self.cut_traceback(traceback, item) 
+        if item and not self.config.option.fulltrace: 
+            path, firstlineno = item.getpathlineno()
+            ntraceback = traceback.cut(path=path, firstlineno=firstlineno)
+            if ntraceback == traceback:
+                ntraceback = ntraceback.cut(path=path)
+            traceback = ntraceback.filter()
         if not traceback: 
             self.out.line("empty traceback from item %r" % (item,)) 
             return
         last = traceback[-1]
         first = traceback[0]
-        recursioncache = {}
-        for entry in traceback: 
+        if not self.config.option.nomagic and excinfo.errisinstance(RuntimeError):
+            recursionindex = traceback.recursionindex()
+        else:
+            recursionindex = None
+        for index, entry in py.builtin.enumerate(traceback): 
             if entry == first: 
                 if item: 
                     self.repr_failure_info(item, entry) 
                     self.out.line()
             else: 
                 self.out.line("")
+            source = self.getentrysource(entry)
             if entry == last: 
-                indent = self.repr_source(entry, 'E') 
-                self.repr_failure_explanation(excinfo, indent) 
+                self.repr_source(source, 'E')
+                self.repr_failure_explanation(excinfo, source) 
             else:
-                self.repr_source(entry, '>') 
+                self.repr_source(source, '>') 
             self.out.line("") 
             self.out.line("[%s:%d]" %(entry.frame.code.path, entry.lineno+1))  
             self.repr_locals(entry) 
@@ -287,27 +295,11 @@
                 self.out.sep("_")
             else: 
                 self.out.sep("_ ")
-                if not self.config.option.nomagic and excinfo.errisinstance(RuntimeError) \
-                       and self.isrecursive(entry, recursioncache): 
+                if index == recursionindex:
                     self.out.line("Recursion detected (same locals & position)")
                     self.out.sep("!")
                     break 
 
-    def isrecursive(self, entry, recursioncache): 
-        # recursion detection 
-        key = entry.frame.code.path, entry.frame.lineno 
-        #print "checking for recursion at", key
-        l = recursioncache.setdefault(key, [])
-        if l: 
-            f = entry.frame
-            loc = f.f_locals
-            for otherloc in l: 
-                if f.is_true(f.eval(co_equal, 
-                    __recursioncache_locals_1=loc,
-                    __recursioncache_locals_2=otherloc)):
-                    return True 
-        l.append(entry.frame.f_locals)
-        
     def repr_failure_info(self, item, entry): 
         root = item.fspath 
         modpath = item.getmodpath() 
@@ -326,62 +318,27 @@
         else:
             self.out.sep("_", "entrypoint: %s %s" %(root.basename, modpath))
 
-    def repr_source(self, entry, marker=">"): 
+    def getentrysource(self, entry):
         try: 
             source = entry.getsource() 
         except py.error.ENOENT:
-            self.out.line("[failure to get at sourcelines from %r]\n" % entry)
-        else: 
-            source = source.deindent() 
-            for line in source[:-1]: 
-                self.out.line("    " + line) 
-            lastline = source[-1]
-            self.out.line(marker + "   " + lastline) 
-            try: 
-                s = str(source.getstatement(len(source)-1))
-            except KeyboardInterrupt: 
-                raise 
-            except: 
-                #self.out.line("[failed to get last statement]\n%s" %(source,))
-                s = str(source[-1])
-            #print "XXX %r" % s
-            return 4 + (len(s) - len(s.lstrip()))
-        return 0 
-
-    def cut_traceback(self, traceback, item=None): 
-        if self.config.option.fulltrace or item is None:
-            return
-        newtraceback = traceback[:]
-        path, lineno = item.getpathlineno() 
-        for i, entry in py.builtin.enumerate(newtraceback): 
-            if entry.frame.code.path == path: 
-               last = i
-               while i < len(newtraceback)-1: 
-                    entry = newtraceback[i]
-                    next = newtraceback[i+1]
-                    if next.frame.code.path != path: 
-                        break 
-                    if entry.frame.code.firstlineno == lineno: 
-                        break 
-               del newtraceback[:i]
-               break
-        if not newtraceback: 
-            newtraceback = traceback[:]
-            
-        # get rid of all frames marked with __tracebackhide__ 
-        l = []
-        for entry in newtraceback: 
-            try: 
-                x = entry.frame.eval("__tracebackhide__") 
-            except: 
-                x = None 
-            if not x: 
-                l.append(entry) 
-        traceback[:] = l 
+            source = py.code.Source("[failure to get at sourcelines from %r]\n" % entry)
+        return source.deindent()
 
-    def repr_failure_explanation(self, excinfo, indent): 
+    def repr_source(self, source, marker=">"):
+        for line in source[:-1]: 
+            self.out.line("    " + line) 
+        lastline = source[-1]
+        self.out.line(marker + "   " + lastline) 
 
-        indent = " " * indent 
+    def repr_failure_explanation(self, excinfo, source): 
+        try: 
+            s = str(source.getstatement(len(source)-1))
+        except KeyboardInterrupt: 
+            raise 
+        except: 
+            s = str(source[-1])
+        indent = " " * (4 + (len(s) - len(s.lstrip())))
         # get the real exception information out 
         lines = excinfo.exconly(tryshort=True).split('\n') 
         self.out.line('>' + indent[:-1] + lines.pop(0)) 
@@ -435,9 +392,6 @@
                     self.out.line("%-10s =\\" % (name,))
                     py.std.pprint.pprint(value, stream=self.out)
 
-co_equal = compile('__recursioncache_locals_1 == __recursioncache_locals_2',
-                   '?', 'eval')
-
 def repr_pythonversion():
     v = py.std.sys.version_info
     try:

Modified: py/dist/py/test/tkinter/backend.py
==============================================================================
--- py/dist/py/test/tkinter/backend.py	(original)
+++ py/dist/py/test/tkinter/backend.py	Thu Nov 10 02:06:34 2005
@@ -191,8 +191,8 @@
         self.testrepository = TestRepository()
         self.reportstore = ReportStore()
         self.gateway = py.execnet.PopenGateway(config.option.executable) 
-        self.channel = self.gateway.newchannel(receiver = self.queue.put)
-        self.gateway.remote_exec(channel = self.channel, source = '''
+        #self.channel = self.gateway.newchannel(receiver = self.queue.put)
+        self.channel = self.gateway.remote_exec(source = '''
         import py
         from py.__.test.tkinter.backend import remote
 
@@ -201,6 +201,7 @@
         # why?
         channel.close()
         ''')
+        self.channel.setcallback(self.queue.put)
         self.channel.send((args, tests))
         self.waitfinish_thread = threading.Thread(target = waitfinish, args = (self.channel,))
         self.waitfinish_thread.start()

Modified: py/dist/py/thread/pool.py
==============================================================================
--- py/dist/py/thread/pool.py	(original)
+++ py/dist/py/thread/pool.py	Thu Nov 10 02:06:34 2005
@@ -3,18 +3,20 @@
 import time 
 import sys
 
+ERRORMARKER = object() 
+
 class Reply(object): 
     _excinfo = None 
     def __init__(self, task): 
         self.task = task 
         self._queue = Queue.Queue() 
 
-    def set(self, result): 
+    def _set(self, result): 
         self._queue.put(result) 
 
-    def setexcinfo(self, excinfo): 
+    def _setexcinfo(self, excinfo): 
         self._excinfo = excinfo 
-        self._queue.put(None) 
+        self._queue.put(ERRORMARKER) 
 
     def _get_with_timeout(self, timeout): 
         # taken from python2.3's Queue.get() 
@@ -32,13 +34,15 @@
                 time.sleep(delay)       #reduce CPU usage by using a sleep
 
     def get(self, timeout=None): 
+        if self._queue is None: 
+            raise EOFError("reply has already been delivered")
         if timeout is not None: 
             result = self._get_with_timeout(timeout) 
         else: 
             result = self._queue.get() 
-        excinfo = self._excinfo 
-        if excinfo: 
-            self._excinfo = None 
+        if result is ERRORMARKER: 
+            self._queue = None
+            excinfo = self._excinfo 
             raise excinfo[0], excinfo[1], excinfo[2]
         return result 
 
@@ -61,9 +65,9 @@
         except (SystemExit, KeyboardInterrupt): 
             return False
         except: 
-            reply.setexcinfo(sys.exc_info()) 
+            reply._setexcinfo(sys.exc_info()) 
         else: 
-            reply.set(result) 
+            reply._set(result) 
         # at this point, reply, task and all other local variables go away
         return True
 

Modified: py/dist/py/thread/testing/test_pool.py
==============================================================================
--- py/dist/py/thread/testing/test_pool.py	(original)
+++ py/dist/py/thread/testing/test_pool.py	Thu Nov 10 02:06:34 2005
@@ -46,6 +46,7 @@
         raise ValueError("42") 
     reply = pool.dispatch(f) 
     excinfo = py.test.raises(ValueError, "reply.get(1.0)") 
+    py.test.raises(EOFError, "reply.get(1.0)") 
 
 def test_maxthreads():
     pool = WorkerPool(maxthreads=1)



More information about the pytest-commit mailing list