[py-svn] r10680 - in py/branch/py-collect: . execnet execnet/testing test/testing thread thread/testing

hpk at codespeak.net hpk at codespeak.net
Fri Apr 15 17:37:15 CEST 2005


Author: hpk
Date: Fri Apr 15 17:37:15 2005
New Revision: 10680

Added:
   py/branch/py-collect/thread/   (props changed)
   py/branch/py-collect/thread/__init__.py
      - copied unchanged from r10656, py/branch/py-collect/test/__init__.py
   py/branch/py-collect/thread/io.py
   py/branch/py-collect/thread/pool.py
      - copied, changed from r10656, py/branch/py-collect/execnet/gateway.py
   py/branch/py-collect/thread/testing/   (props changed)
   py/branch/py-collect/thread/testing/__init__.py
      - copied unchanged from r10656, py/branch/py-collect/test/__init__.py
   py/branch/py-collect/thread/testing/test_threadpool.py   (contents, props changed)
      - copied, changed from r10656, py/branch/py-collect/execnet/testing/test_threadpool.py
Removed:
   py/branch/py-collect/execnet/testing/test_threadpool.py
Modified:
   py/branch/py-collect/__init__.py
   py/branch/py-collect/execnet/gateway.py
   py/branch/py-collect/execnet/register.py
   py/branch/py-collect/execnet/testing/test_gateway.py
   py/branch/py-collect/test/testing/test_session.py
Log:
refactoring thread-related stuff from py.execnet to 
a new namespace: py.thread.   

However, i am going to change it to py._thread for the time
being because the design of the supplied helper-classes is
somewhat floating.  Samuele Pedroni pointed me to the way Java
1.5 organizes Threads which might be an interesting source
(although heavily convoluted to read :-) for design
considerations. 



Modified: py/branch/py-collect/__init__.py
==============================================================================
--- py/branch/py-collect/__init__.py	(original)
+++ py/branch/py-collect/__init__.py	Fri Apr 15 17:37:15 2005
@@ -26,6 +26,11 @@
     'test.Item'              : ('./test/item.py', 'Item'),
     'test.Function'          : ('./test/item.py', 'Function'),
 
+    # thread related API 
+    'thread.WorkerPool'      : ('./thread/pool.py', 'WorkerPool'), 
+    'thread.NamedThreadPool' : ('./thread/pool.py', 'NamedThreadPool'), 
+    'thread.ThreadOut'       : ('./thread/io.py', 'ThreadOut'), 
+
     # hook into the top-level standard library 
     'std'                    : ('./misc/std.py', 'std'),
 

Modified: py/branch/py-collect/execnet/gateway.py
==============================================================================
--- py/branch/py-collect/execnet/gateway.py	(original)
+++ py/branch/py-collect/execnet/gateway.py	Fri Apr 15 17:37:15 2005
@@ -8,11 +8,14 @@
 
 
 # XXX the following line should not be here
-g = globals()
-if 'Message' not in g:
+if 'Message' not in globals(): 
+    import py 
     from py.code import Source
     from py.__impl__.execnet.channel import ChannelFactory, Channel
     from py.__impl__.execnet.message import Message
+    ThreadOut = py.thread.ThreadOut 
+    WorkerPool = py.thread.WorkerPool 
+    NamedThreadPool = py.thread.NamedThreadPool 
 
 assert Message and ChannelFactory, "Import/Configuration Error"
 
@@ -33,200 +36,10 @@
     def __repr__(self):
         return "%s: %s" %(self.__class__.__name__, self.formatted)
 
-class WorkerThread(threading.Thread): 
-    def __init__(self, pool): 
-        super(WorkerThread, self).__init__() 
-        self._queue = Queue.Queue() 
-        self._pool = pool 
-        self.setDaemon(1) 
-
-    def run(self): 
-        try: 
-            while 1: 
-                task = self._queue.get() 
-                assert self not in self._pool._ready 
-                if task is None: 
-                    break 
-                try: 
-                    func, args, kwargs = task 
-                    func(*args, **kwargs) 
-                except (SystemExit, KeyboardInterrupt): 
-                    break 
-                except: 
-                    import traceback
-                    traceback.print_exc() 
-                self._pool._ready.append(self) 
-        finally: 
-            del self._pool._alive[self]
-
-    def handle(self, task): 
-        self._queue.put(task) 
-
-    def stop(self): 
-        self._queue.put(None) 
-
-class WorkerPool(object): 
-    _shutdown = False 
-    def __init__(self, maxthreads=None): 
-        self.maxthreads = maxthreads
-        self._numthreads = 0 
-        self._ready = []
-        self._alive = {}
-
-    def dispatch(self, func, *args, **kwargs): 
-        if self._shutdown: 
-            raise IOError("WorkerPool is already shutting down") 
-        task = (func, args, kwargs) 
-        try: 
-            thread = self._ready.pop() 
-        except IndexError: # pop from empty list
-            thread = self._newthread() 
-        thread.handle(task) 
-
-    def __del__(self): 
-        self.shutdown() 
-
-    def shutdown(self, timeout=1.0): 
-        if not self._shutdown: 
-            self._shutdown = True
-            now = time.time() 
-            while self._alive: 
-                try: 
-                    thread = self._ready.pop() 
-                except IndexError: 
-                    if now + timeout < time.time(): 
-                        raise IOError("Timeout: could not shut down WorkerPool") 
-                    time.sleep(0.1) 
-                else: 
-                    thread.stop() 
-
-    def _newthread(self): 
-        if self.maxthreads: 
-            if len(self._alive) >= self.maxthreads: 
-                raise IOError("cannot create more threads, "
-                              "maxthreads=%d" % (self.maxthreads,))
-        thread = WorkerThread(self) 
-        thread.start() 
-        self._alive[thread] = True 
-        return thread 
-
-    def join(self): 
-        current = threading.currentThread()
-        for x in self._alive.keys(): 
-            if current != x: 
-                x.join()
-
-class NamedThreadPool: 
-    def __init__(self, **kw): 
-        self._namedthreads = {}
-        for name, value in kw.items(): 
-            self.start(name, value) 
-
-    def __repr__(self): 
-        return "<NamedThreadPool %r>" %(self._namedthreads) 
-
-    def get(self, name=None): 
-        if name is None: 
-            l = []
-            for x in self._namedthreads.values(): 
-                l.extend(x) 
-            return l 
-        else: 
-            return self._namedthreads.get(name, [])
-
-    def getstarted(self, name=None): 
-        return [t for t in self.get(name) if t.isAlive()]
-
-    def prunestopped(self, name=None): 
-        if name is None: 
-            for name in self.names(): 
-                self.prunestopped(name) 
-        else: 
-            self._namedthreads[name] = self.getstarted(name) 
-
-    def names(self): 
-        return self._namedthreads.keys() 
-
-    def start(self, name, func): 
-        l = self._namedthreads.setdefault(name, []) 
-        thread = threading.Thread(name="%s%d" % (name, len(l)), 
-                                  target=func) 
-        thread.start() 
-        l.append(thread) 
-
-class ThreadOut(object): 
-    def __new__(cls, obj, attrname): 
-        """ Divert file output to per-thread writefuncs. 
-            the given obj and attrname describe the destination 
-            of the file.  
-        """ 
-        current = getattr(obj, attrname)
-        if isinstance(current, cls): 
-            current._used += 1
-            return current 
-        self = object.__new__(cls) 
-        self._tid2out = {}
-        self._used = 1 
-        self._oldout = getattr(obj, attrname) 
-        self._defaultwriter = self._oldout.write 
-        self._address = (obj, attrname) 
-        setattr(obj, attrname, self) 
-        return self 
-
-    def setdefaultwriter(self, writefunc): 
-        self._defaultwriter = writefunc 
-
-    def resetdefault(self): 
-        self._defaultwriter = self._oldout.write
-
-    def softspace(): 
-        def fget(self): 
-            return self._get()[0]
-        def fset(self, value): 
-            self._get()[0] = value 
-        return property(fget, fset, None, "software attribute") 
-    softspace = softspace()
-
-    def deinstall(self): 
-        self._used -= 1 
-        x = self._used 
-        if x <= 0: 
-            obj, attrname = self._address 
-            setattr(obj, attrname, self._oldout) 
-        
-    def setwritefunc(self, writefunc, tid=None): 
-        assert callable(writefunc)
-        if tid is None: 
-            tid = thread.get_ident() 
-        self._tid2out[tid] = [0, writefunc]
-
-    def delwritefunc(self, tid=None, ignoremissing=True): 
-        if tid is None: 
-            tid = thread.get_ident() 
-        try: 
-            del self._tid2out[tid] 
-        except KeyError: 
-            if not ignoremissing: 
-                raise 
-
-    def _get(self): 
-        tid = thread.get_ident() 
-        try: 
-            return self._tid2out[tid]
-        except KeyError: 
-            return getattr(self._defaultwriter, 'softspace', 0), self._defaultwriter 
-
-    def write(self, data): 
-        softspace, out = self._get() 
-        out(data) 
-
-    def flush(self): 
-        pass 
-   
 class Gateway(object):
     num_worker_threads = 2
     RemoteError = RemoteError
-    ThreadOut = ThreadOut
+    ThreadOut = ThreadOut 
 
     def __init__(self, io, startcount=2, maxthreads=None):
         self._execpool = WorkerPool() 

Modified: py/branch/py-collect/execnet/register.py
==============================================================================
--- py/branch/py-collect/execnet/register.py	(original)
+++ py/branch/py-collect/execnet/register.py	Fri Apr 15 17:37:15 2005
@@ -4,13 +4,33 @@
 from py.magic import autopath ; mypath = autopath()
 
 import py
-from py.__impl__.execnet import inputoutput, gateway, channel, message
+
+# the list of modules that must be send to the other side 
+# for bootstrapping gateways
+# XXX we want to have a cleaner bootstrap mechanism 
+#     by making sure early that we have the py lib available
+#     in a sufficient version 
+
+startup_modules = [
+    'py.__impl__.execnet.inputoutput', 
+    'py.__impl__.execnet.gateway', 
+    'py.__impl__.execnet.channel', 
+    'py.__impl__.execnet.message', 
+    'py.__impl__.thread.io', 
+    'py.__impl__.thread.pool', 
+]
+
+def getsource(dottedname): 
+    mod = __import__(dottedname, None, None, ['last'])
+    return inspect.getsource(mod) 
+    
+from py.__impl__.execnet import inputoutput, gateway 
 
 class InstallableGateway(gateway.Gateway):
     """ initialize gateways on both sides of a inputoutput object. """
     def __init__(self, io):
         self.remote_bootstrap_gateway(io)
-        gateway.Gateway.__init__(self, io=io, startcount=1)
+        super(InstallableGateway, self).__init__(io=io, startcount=1)
 
     def remote_bootstrap_gateway(self, io, extra=''):
         """ return Gateway with a asynchronously remotely
@@ -20,15 +40,9 @@
             gateway starts with odd numbers.  This allows to
             uniquely identify channels across both sides.
         """
-        bootstrap = [
-            extra,
-            inspect.getsource(inputoutput),
-            inspect.getsource(message),
-            inspect.getsource(channel),
-            inspect.getsource(gateway),
-            io.server_stmt,
-            "Gateway(io=io, startcount=2).join()",
-        ]
+        bootstrap = ["we_are_remote=True", extra]
+        bootstrap += [getsource(x) for x in startup_modules]
+        bootstrap += [io.server_stmt, "Gateway(io=io, startcount=2).join()",]
         source = "\n".join(bootstrap)
         self.trace("sending gateway bootstrap code")
         io.write('%r\n' % source)

Modified: py/branch/py-collect/execnet/testing/test_gateway.py
==============================================================================
--- py/branch/py-collect/execnet/testing/test_gateway.py	(original)
+++ py/branch/py-collect/execnet/testing/test_gateway.py	Fri Apr 15 17:37:15 2005
@@ -5,6 +5,24 @@
 mypath = py.magic.autopath()
 
 from StringIO import StringIO
+from py.__impl__.execnet.register import startup_modules, getsource 
+
+def test_getsource_import_modules(): 
+    for dottedname in startup_modules: 
+        yield getsource, dottedname 
+
+def test_getsource_no_colision(): 
+    seen = {}
+    for dottedname in startup_modules: 
+        mod = __import__(dottedname, None, None, ['last'])
+        for name, value in vars(mod).items(): 
+            if py.std.inspect.isclass(value): 
+                if name in seen: 
+                    olddottedname, oldval = seen[name]
+                    if oldval is not value: 
+                        py.test.fail("duplicate class %r in %s and %s" % 
+                                     (name, dottedname, olddottedname)) 
+                seen[name] = (dottedname, value) 
 
 class TestMessage:
     def test_wire_protocol(self):
@@ -50,7 +68,6 @@
         channel = self.fac.new()
         py.test.raises(IOError, channel.waitclose, timeout=0.01)
 
-
 class PopenGatewayTestSetup:
     def setup_class(cls):
         cls.gw = py.execnet.PopenGateway()

Deleted: /py/branch/py-collect/execnet/testing/test_threadpool.py
==============================================================================
--- /py/branch/py-collect/execnet/testing/test_threadpool.py	Fri Apr 15 17:37:15 2005
+++ (empty file)
@@ -1,104 +0,0 @@
-
-from py.__impl__.execnet.gateway import WorkerPool, ThreadOut
-import py
-import sys 
-
-def test_some(): 
-    pool = WorkerPool() 
-    q = py.std.Queue.Queue() 
-    num = 4
-    for i in range(num): 
-        pool.dispatch(q.put, i) 
-    for i in range(num): 
-        q.get() 
-    assert len(pool._alive) == 4 
-    pool.shutdown() 
-    assert len(pool._alive) == 0 
-    assert len(pool._ready) == 0 
-
-def test_maxthreads(): 
-    pool = WorkerPool(maxthreads=1) 
-    def f(): 
-        py.std.time.sleep(0.5) 
-    try: 
-        pool.dispatch(f) 
-        py.test.raises(IOError, pool.dispatch, f)
-    finally: 
-        pool.shutdown() 
-
-def test_shutdown_timeout(): 
-    pool = WorkerPool() 
-    def f(): 
-        py.std.time.sleep(0.5) 
-    pool.dispatch(f) 
-    py.test.raises(IOError, pool.shutdown, 0.2) 
-
-def test_pool_clean_shutdown(): 
-    pool = WorkerPool() 
-    def f(): 
-        pass 
-    pool.dispatch(f) 
-    pool.dispatch(f) 
-    pool.shutdown()
-    assert not pool._alive  
-    assert not pool._ready  
-
-def test_threadout_install_deinstall(): 
-    old = sys.stdout 
-    out = ThreadOut(sys, 'stdout') 
-    out.deinstall() 
-    assert old == sys.stdout 
-
-class TestThreadOut: 
-    def setup_method(self, method): 
-        self.out = ThreadOut(sys, 'stdout') 
-    def teardown_method(self, method): 
-        self.out.deinstall() 
-        
-    def test_threadout_one(self): 
-        l = []
-        self.out.setwritefunc(l.append) 
-        print 42,13,
-        x = l.pop(0) 
-        assert x == '42' 
-        x = l.pop(0) 
-        assert x == ' '
-        x = l.pop(0) 
-        assert x == '13' 
-
-
-    def test_threadout_multi_and_default(self): 
-        num = 3 
-        defaults = []
-        def f(l): 
-            self.out.setwritefunc(l.append) 
-            print id(l),
-            self.out.delwritefunc() 
-            print 1 
-
-        self.out.setdefaultwriter(defaults.append) 
-        pool = WorkerPool() 
-        listlist = []
-        for x in range(num): 
-            l = []
-            listlist.append(l) 
-            pool.dispatch(f, l) 
-        pool.shutdown() 
-        for name, value in self.out.__dict__.items(): 
-            print >>sys.stderr, "%s: %s" %(name, value) 
-        for i in range(num): 
-            item = listlist[i]
-            assert item ==[str(id(item))]
-        assert not self.out._tid2out 
-        assert defaults 
-        expect = ['1' for x in range(num)]
-        defaults = [x for x in defaults if x.strip()]
-        assert defaults == expect 
-
-    def test_threadout_nested(self): 
-        # we want ThreadOuts to coexist 
-        last = sys.stdout
-        out = ThreadOut(sys, 'stdout') 
-        assert last == sys.stdout 
-        out.deinstall() 
-        assert last == sys.stdout 

Modified: py/branch/py-collect/test/testing/test_session.py
==============================================================================
--- py/branch/py-collect/test/testing/test_session.py	(original)
+++ py/branch/py-collect/test/testing/test_session.py	Fri Apr 15 17:37:15 2005
@@ -143,3 +143,28 @@
         assert item.name == 'test_4' 
         names = item.listnames()
         assert names == ['ordertest', 'test_orderofexecution.py', 'Testmygroup', '()', 'test_4']
+
+class TestRemoteSession: 
+
+    def XXXtest_basic(self, method): 
+        o = tmpdir.ensure('basicremote', dir=1) 
+        tfile = o.join('test_remote.py')
+        tfile.write(py.code.Source("""
+            def test_1():
+                assert 1 == 0 
+        """))
+        config, args = py.test.Config.parse(['--looponfailing']) 
+        assert config._remote 
+        cls = config.getsessionclass() 
+        out = py.std.Queue.Queue() 
+        session = cls(config, out.put) 
+        pool = WorkerPool() 
+        reply = pool.dispatch(session.main, [o])
+        session.main([o]) 
+        out = self.file.getvalue() 
+        #print out
+        l = []
+        self.session = py.test.TerminalSession(config, file=self.file) 
+        #print >>f, "session %r is setup for %r" %(self.session, method) 
+        #f.flush()
+        #print "session is setup", self.session 

Added: py/branch/py-collect/thread/io.py
==============================================================================
--- (empty file)
+++ py/branch/py-collect/thread/io.py	Fri Apr 15 17:37:15 2005
@@ -0,0 +1,72 @@
+
+import thread
+
+class ThreadOut(object): 
+    def __new__(cls, obj, attrname): 
+        """ Divert file output to per-thread writefuncs. 
+            the given obj and attrname describe the destination 
+            of the file.  
+        """ 
+        current = getattr(obj, attrname)
+        if isinstance(current, cls): 
+            current._used += 1
+            return current 
+        self = object.__new__(cls) 
+        self._tid2out = {}
+        self._used = 1 
+        self._oldout = getattr(obj, attrname) 
+        self._defaultwriter = self._oldout.write 
+        self._address = (obj, attrname) 
+        setattr(obj, attrname, self) 
+        return self 
+
+    def setdefaultwriter(self, writefunc): 
+        self._defaultwriter = writefunc 
+
+    def resetdefault(self): 
+        self._defaultwriter = self._oldout.write
+
+    def softspace(): 
+        def fget(self): 
+            return self._get()[0]
+        def fset(self, value): 
+            self._get()[0] = value 
+        return property(fget, fset, None, "software attribute") 
+    softspace = softspace()
+
+    def deinstall(self): 
+        self._used -= 1 
+        x = self._used 
+        if x <= 0: 
+            obj, attrname = self._address 
+            setattr(obj, attrname, self._oldout) 
+        
+    def setwritefunc(self, writefunc, tid=None): 
+        assert callable(writefunc)
+        if tid is None: 
+            tid = thread.get_ident() 
+        self._tid2out[tid] = [0, writefunc]
+
+    def delwritefunc(self, tid=None, ignoremissing=True): 
+        if tid is None: 
+            tid = thread.get_ident() 
+        try: 
+            del self._tid2out[tid] 
+        except KeyError: 
+            if not ignoremissing: 
+                raise 
+
+    def _get(self): 
+        tid = thread.get_ident() 
+        try: 
+            return self._tid2out[tid]
+        except KeyError: 
+            return getattr(self._defaultwriter, 'softspace', 0), self._defaultwriter 
+
+    def write(self, data): 
+        softspace, out = self._get() 
+        out(data) 
+
+    def flush(self): 
+        pass 
+   

Copied: py/branch/py-collect/thread/pool.py (from r10656, py/branch/py-collect/execnet/gateway.py)
==============================================================================
--- py/branch/py-collect/execnet/gateway.py	(original)
+++ py/branch/py-collect/thread/pool.py	Fri Apr 15 17:37:15 2005
@@ -1,37 +1,6 @@
-import sys
-import os
-import thread, threading
-import Queue
-import traceback
-import atexit
-import time
-
-
-# XXX the following line should not be here
-g = globals()
-if 'Message' not in g:
-    from py.code import Source
-    from py.__impl__.execnet.channel import ChannelFactory, Channel
-    from py.__impl__.execnet.message import Message
-
-assert Message and ChannelFactory, "Import/Configuration Error"
-
-import os
-debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid()  , 'wa')
-
-sysex = (KeyboardInterrupt, SystemExit)
-
-class RemoteError(EOFError):
-    """ Contains an Exceptions from the other side. """
-    def __init__(self, formatted):
-        self.formatted = formatted
-        EOFError.__init__(self)
-
-    def __str__(self):
-        return self.formatted
-
-    def __repr__(self):
-        return "%s: %s" %(self.__class__.__name__, self.formatted)
+import Queue 
+import threading
+import time 
 
 class WorkerThread(threading.Thread): 
     def __init__(self, pool): 
@@ -83,9 +52,6 @@
             thread = self._newthread() 
         thread.handle(task) 
 
-    def __del__(self): 
-        self.shutdown() 
-
     def shutdown(self, timeout=1.0): 
         if not self._shutdown: 
             self._shutdown = True
@@ -154,319 +120,3 @@
         thread.start() 
         l.append(thread) 
 
-class ThreadOut(object): 
-    def __new__(cls, obj, attrname): 
-        """ Divert file output to per-thread writefuncs. 
-            the given obj and attrname describe the destination 
-            of the file.  
-        """ 
-        current = getattr(obj, attrname)
-        if isinstance(current, cls): 
-            current._used += 1
-            return current 
-        self = object.__new__(cls) 
-        self._tid2out = {}
-        self._used = 1 
-        self._oldout = getattr(obj, attrname) 
-        self._defaultwriter = self._oldout.write 
-        self._address = (obj, attrname) 
-        setattr(obj, attrname, self) 
-        return self 
-
-    def setdefaultwriter(self, writefunc): 
-        self._defaultwriter = writefunc 
-
-    def resetdefault(self): 
-        self._defaultwriter = self._oldout.write
-
-    def softspace(): 
-        def fget(self): 
-            return self._get()[0]
-        def fset(self, value): 
-            self._get()[0] = value 
-        return property(fget, fset, None, "software attribute") 
-    softspace = softspace()
-
-    def deinstall(self): 
-        self._used -= 1 
-        x = self._used 
-        if x <= 0: 
-            obj, attrname = self._address 
-            setattr(obj, attrname, self._oldout) 
-        
-    def setwritefunc(self, writefunc, tid=None): 
-        assert callable(writefunc)
-        if tid is None: 
-            tid = thread.get_ident() 
-        self._tid2out[tid] = [0, writefunc]
-
-    def delwritefunc(self, tid=None, ignoremissing=True): 
-        if tid is None: 
-            tid = thread.get_ident() 
-        try: 
-            del self._tid2out[tid] 
-        except KeyError: 
-            if not ignoremissing: 
-                raise 
-
-    def _get(self): 
-        tid = thread.get_ident() 
-        try: 
-            return self._tid2out[tid]
-        except KeyError: 
-            return getattr(self._defaultwriter, 'softspace', 0), self._defaultwriter 
-
-    def write(self, data): 
-        softspace, out = self._get() 
-        out(data) 
-
-    def flush(self): 
-        pass 
-   
-class Gateway(object):
-    num_worker_threads = 2
-    RemoteError = RemoteError
-    ThreadOut = ThreadOut
-
-    def __init__(self, io, startcount=2, maxthreads=None):
-        self._execpool = WorkerPool() 
-        self.running = True 
-        self.io = io
-        self._outgoing = Queue.Queue()
-        self.channelfactory = ChannelFactory(self, startcount)
-        self._exitlock = threading.Lock()
-        self.pool = NamedThreadPool(receiver = self.thread_receiver, 
-                                    sender = self.thread_sender) 
-        if not _gateways:
-            atexit.register(cleanup_atexit)
-        _gateways.append(self)
-
-    def __repr__(self): 
-        R = len(self.pool.getstarted('receiver')) and "receiving" or "not receiving" 
-        S = len(self.pool.getstarted('sender')) and "sending" or "not sending"
-        i = len(self.channelfactory.values())
-        return "<%s %s/%s (%d active channels)>" %(self.__class__.__name__, 
-                                                   R, S, i) 
-
-    def _stopexec(self):
-        #self.pool.prunestopped()
-        self._execpool.shutdown() 
-
-    def exit(self):
-        # note that threads may still be scheduled to start
-        # during our execution! 
-        self._exitlock.acquire()
-        try:
-            if self.running: 
-                self.running = False 
-                self._stopexec()
-                if self.pool.getstarted('sender'): 
-                    self._outgoing.put(Message.EXIT_GATEWAY())
-                self.trace("exit procedure triggered, pid %d, gateway %r" % (
-                           os.getpid(), self))
-                _gateways.remove(self) 
-        finally:
-            self._exitlock.release()
-
-    def join(self):
-        current = threading.currentThread()
-        for x in self.pool.getstarted(): 
-            if x != current: 
-                self.trace("joining %s" % x)
-                x.join()
-        self._execpool.join()
-        self.trace("joining threads finished, current %r" % current) 
-
-    def trace(self, *args):
-        if debug:
-            try:
-                l = "\n".join(args).split(os.linesep)
-                id = getid(self)
-                for x in l:
-                    print >>debug, x
-                debug.flush()
-            except sysex:
-                raise
-            except:
-                traceback.print_exc()
-    def traceex(self, excinfo):
-        l = traceback.format_exception(*excinfo)
-        errortext = "".join(l)
-        self.trace(errortext)
-
-    def thread_receiver(self):
-        """ thread to read and handle Messages half-sync-half-async. """
-        try:
-            while 1:
-                try:
-                    msg = Message.readfrom(self.io)
-                    self.trace("received <- %r" % msg)
-                    msg.received(self)
-                except sysex:
-                    raise
-                except:
-                    self.traceex(sys.exc_info())
-                    break 
-        finally:
-            self.trace('leaving %r' % threading.currentThread())
-
-    def thread_sender(self):
-        """ thread to send Messages over the wire. """
-        try:
-            while 1:
-                msg = self._outgoing.get()
-                try:
-                    msg.writeto(self.io)
-                except:
-                    excinfo = sys.exc_info()
-                    self.traceex(excinfo)
-                    msg.post_sent(self, excinfo)
-                    raise
-                else:
-                    self.trace('sent -> %r' % msg)
-                    msg.post_sent(self)
-        finally:
-            self.trace('leaving %r' % threading.currentThread())
-
-    def _redirect_thread_output(self, outid, errid): 
-        l = []
-        for name, id in ('stdout', outid), ('stderr', errid): 
-            if id: 
-                channel = self._makechannel(outid) 
-                out = ThreadOut(sys, name)
-                out.setwritefunc(channel.send) 
-                l.append((out, channel))
-        def close(): 
-            for out, channel in l: 
-                out.delwritefunc() 
-                channel.close() 
-        return close 
-
-    def _makechannel(self, newid): 
-        newchannel = Channel(self, newid) 
-        self.channelfactory[newid] = newchannel
-        return newchannel 
-
-    def thread_executor(self, channel, (source, outid, errid)): 
-        """ worker thread to execute source objects from the execution queue. """
-        try:
-            loc = { 'channel' : channel }
-            self.trace("execution starts:", repr(source)[:50])
-            close = self._redirect_thread_output(outid, errid) 
-            try:
-                co = compile(source+'\n', '', 'exec', 4096)
-                exec co in loc
-            finally:
-                close() 
-                self.trace("execution finished:", repr(source)[:50])
-        except (KeyboardInterrupt, SystemExit):
-            raise
-        except:
-            excinfo = sys.exc_info()
-            l = traceback.format_exception(*excinfo)
-            errortext = "".join(l)
-            channel.close(errortext)
-            self.trace(errortext)
-        else:
-            channel.close()
-
-    def _scheduleexec(self, channel, sourcetask): 
-        self.trace("dispatching exec")
-        self._execpool.dispatch(self.thread_executor, channel, sourcetask) 
-
-    def _dispatchcallback(self, callback, data): 
-        # XXX this should run in a separate thread because
-        #     we might otherwise block the receiver thread 
-        #     where we get called from 
-        callback(data) 
-
-    def _remote_exec(self, channel, source, stdout=None, stderr=None): 
-        try:
-            source = str(Source(source))
-        except NameError: 
-            try: 
-                import py 
-                source = str(py.code.Source(source))
-            except ImportError: 
-                pass 
-        outid = self._redirectchannelid(stdout) 
-        errid = self._redirectchannelid(stderr) 
-        self._outgoing.put(Message.CHANNEL_OPEN(channel.id, 
-                                                (source, outid, errid))) 
-
-    def _redirectchannelid(self, callback): 
-        if callback is None: 
-            return  
-        if hasattr(callback, 'write'): 
-            callback = callback.write 
-        assert callable(callback) 
-        chan = self.newchannel() 
-        chan.setcallback(callback) 
-        return chan.id 
-
-    # _____________________________________________________________________
-    #
-    # High Level Interface
-    # _____________________________________________________________________
-    #
-    def newchannel(self): 
-        """ return new channel object. """ 
-        return self.channelfactory.new() 
-
-    def remote_exec(self, source, stdout=None, stderr=None): 
-        """ return channel object for communicating with the asynchronously
-            executing 'source' code which will have a corresponding 'channel'
-            object in its executing namespace. If a channel object is not
-            provided a new channel will be created. If a channel is provided
-            is will be returned as well. 
-        """
-        channel = self.newchannel() 
-        channel.remote_exec(source, stdout=stdout, stderr=stderr) 
-        return channel 
-
-    def remote_redirect(self, stdout=None, stderr=None): 
-        """ return a handle representing a redirection of of remote 
-            end's stdout to a local file object.  with handle.close() 
-            the redirection will be reverted.   
-        """ 
-        clist = []
-        for name, out in ('stdout', stdout), ('stderr', stderr): 
-            if out: 
-                outchannel = self.newchannel() 
-                outchannel.setcallback(getattr(out, 'write', out))
-                channel = self.remote_exec(""" 
-                    import sys
-                    outchannel = channel.receive() 
-                    outchannel.gateway.ThreadOut(sys, %r).setdefaultwriter(outchannel.send)
-                """ % name) 
-                channel.send(outchannel)
-                clist.append(channel)
-        for c in clist: 
-            c.waitclose(1.0) 
-        class Handle: 
-            def close(_): 
-                for name, out in ('stdout', stdout), ('stderr', stderr): 
-                    if out: 
-                        c = self.remote_exec("""
-                            import sys
-                            channel.gateway.ThreadOut(sys, %r).resetdefault()
-                        """ % name) 
-                        c.waitclose(1.0) 
-        return Handle()
-
-def getid(gw, cache={}):
-    name = gw.__class__.__name__
-    try:
-        return cache.setdefault(name, {})[id(gw)]
-    except KeyError:
-        cache[name][id(gw)] = x = "%s:%s.%d" %(os.getpid(), gw.__class__.__name__, len(cache[name]))
-        return x
-
-_gateways = []
-def cleanup_atexit():
-    if debug:
-        print >>debug, "="*20 + "cleaning up" + "=" * 20
-        debug.flush()
-    while _gateways: 
-        x = _gateways[-1]
-        x.exit()

Copied: py/branch/py-collect/thread/testing/test_threadpool.py (from r10656, py/branch/py-collect/execnet/testing/test_threadpool.py)
==============================================================================
--- py/branch/py-collect/execnet/testing/test_threadpool.py	(original)
+++ py/branch/py-collect/thread/testing/test_threadpool.py	Fri Apr 15 17:37:15 2005
@@ -1,8 +1,10 @@
 
-from py.__impl__.execnet.gateway import WorkerPool, ThreadOut
 import py
 import sys 
 
+WorkerPool = py.thread.WorkerPool 
+ThreadOut = py.thread.ThreadOut
+
 def test_some(): 
     pool = WorkerPool() 
     q = py.std.Queue.Queue() 



More information about the pytest-commit mailing list