[py-svn] r10632 - in py/branch/py-collect/execnet: . testing

hpk at codespeak.net hpk at codespeak.net
Thu Apr 14 19:12:33 CEST 2005


Author: hpk
Date: Thu Apr 14 19:12:33 2005
New Revision: 10632

Modified:
   py/branch/py-collect/execnet/gateway.py
   py/branch/py-collect/execnet/testing/test_threadpool.py
Log:
cleanup of startup/shutdown for workerthreads 


Modified: py/branch/py-collect/execnet/gateway.py
==============================================================================
--- py/branch/py-collect/execnet/gateway.py	(original)
+++ py/branch/py-collect/execnet/gateway.py	Thu Apr 14 19:12:33 2005
@@ -42,18 +42,23 @@
         self.setDaemon(1) 
 
     def run(self): 
-        while 1: 
-            task = self._queue.get() 
-            assert self not in self._pool._ready 
-            if task is None: 
-                break 
-            try: 
-                func, args, kwargs = task 
-                func(*args, **kwargs) 
-            except: 
-                import traceback
-                traceback.print_exc() 
-            self._pool._ready.append(self) 
+        try: 
+            while 1: 
+                task = self._queue.get() 
+                assert self not in self._pool._ready 
+                if task is None: 
+                    break 
+                try: 
+                    func, args, kwargs = task 
+                    func(*args, **kwargs) 
+                except (SystemExit, KeyboardInterrupt): 
+                    break 
+                except: 
+                    import traceback
+                    traceback.print_exc() 
+                self._pool._ready.append(self) 
+        finally: 
+            del self._pool._alive[self]
 
     def handle(self, task): 
         self._queue.put(task) 
@@ -67,6 +72,7 @@
         self.maxthreads = maxthreads
         self._numthreads = 0 
         self._ready = []
+        self._alive = {}
 
     def dispatch(self, func, *args, **kwargs): 
         if self._shutdown: 
@@ -85,7 +91,7 @@
         if not self._shutdown: 
             self._shutdown = True
             now = time.time() 
-            while self._numthreads: 
+            while self._alive: 
                 try: 
                     thread = self._ready.pop() 
                 except IndexError: 
@@ -93,18 +99,24 @@
                         raise IOError("Timeout: could not shut down WorkerPool") 
                     time.sleep(0.1) 
                 else: 
-                    self._numthreads -= 1 
+                    thread.stop() 
 
     def _newthread(self): 
         if self.maxthreads: 
-            if self._numthreads >= self.maxthreads: 
+            if len(self._alive) >= self.maxthreads: 
                 raise IOError("cannot create more threads, "
                               "maxthreads=%d" % (self.maxthreads,))
         thread = WorkerThread(self) 
-        self._numthreads += 1 
         thread.start() 
+        self._alive[thread] = True 
         return thread 
 
+    def join(self): 
+        current = threading.currentThread()
+        for x in self._alive.keys(): 
+            if current != x: 
+                x.join()
+
 class NamedThreadPool: 
     def __init__(self, **kw): 
         self._namedthreads = {}
@@ -161,10 +173,10 @@
         _gateways.append(self)
 
     def __repr__(self): 
-        R = len(self.pool.getstarted('receiver'))
-        S = len(self.pool.getstarted('sender'))
+        R = len(self.pool.getstarted('receiver')) and "receiving" or "not receiving" 
+        S = len(self.pool.getstarted('sender')) and "sending" or "not sending"
         i = len(self.channelfactory.values())
-        return "<%s %d/%d (%d active channels)>" %(self.__class__.__name__, 
+        return "<%s %s/%s (%d active channels)>" %(self.__class__.__name__, 
                                                    R, S, i) 
 
     def _stopexec(self):
@@ -193,6 +205,7 @@
             if x != current: 
                 self.trace("joining %s" % x)
                 x.join()
+        self._execpool.join()
         self.trace("joining threads finished, current %r" % current) 
 
     def trace(self, *args):

Modified: py/branch/py-collect/execnet/testing/test_threadpool.py
==============================================================================
--- py/branch/py-collect/execnet/testing/test_threadpool.py	(original)
+++ py/branch/py-collect/execnet/testing/test_threadpool.py	Thu Apr 14 19:12:33 2005
@@ -4,16 +4,17 @@
 
 def test_some(): 
     pool = WorkerPool() 
-    l = []
-    try: 
-        pool.dispatch(l.append, 1) 
-        pool.dispatch(l.append, 2) 
-        pool.dispatch(l.append, 3) 
-        pool.dispatch(l.append, 4) 
-    finally: 
-        pool.shutdown() 
-    assert len(pool._ready) == pool._numthreads 
-    assert len(l) == 4
+    q = py.std.Queue.Queue() 
+    num = 4
+    for i in range(num): 
+        pool.dispatch(q.put, i) 
+    for i in range(num): 
+        q.get() 
+    assert len(pool._alive) == 4 
+    assert len(pool._ready) == 4 
+    pool.shutdown() 
+    assert len(pool._alive) == 0 
+    assert len(pool._ready) == 0 
 
 def test_maxthreads(): 
     pool = WorkerPool(maxthreads=1) 
@@ -28,6 +29,16 @@
 def test_shutdown_timeout(): 
     pool = WorkerPool() 
     def f(): 
-        py.std.time.sleep(1.5) 
+        py.std.time.sleep(0.5) 
     pool.dispatch(f) 
     py.test.raises(IOError, pool.shutdown, 0.2) 
+
+def test_pool_clean_shutdown(): 
+    pool = WorkerPool() 
+    def f(): 
+        pass 
+    pool.dispatch(f) 
+    pool.dispatch(f) 
+    pool.shutdown()
+    assert not pool._alive  
+    assert not pool._ready  



More information about the pytest-commit mailing list