[py-svn] r11006 - in py/dist/py/thread: . testing

hpk at codespeak.net hpk at codespeak.net
Thu Apr 21 23:48:29 CEST 2005


Author: hpk
Date: Thu Apr 21 23:48:29 2005
New Revision: 11006

Modified:
   py/dist/py/thread/pool.py
   py/dist/py/thread/testing/test_pool.py
Log:
fixing tests and improving readability of the WorkerPools 



Modified: py/dist/py/thread/pool.py
==============================================================================
--- py/dist/py/thread/pool.py	(original)
+++ py/dist/py/thread/pool.py	Thu Apr 21 23:48:29 2005
@@ -3,46 +3,6 @@
 import time 
 import sys
 
-class WorkerThread(threading.Thread): 
-    def __init__(self, pool): 
-        threading.Thread.__init__(self) 
-        self._queue = Queue.Queue() 
-        self._pool = pool 
-        self.setDaemon(1) 
-
-    def run(self): 
-        try: 
-            while 1: 
-                reply = self._queue.get() 
-                if reply is None: 
-                    break 
-                assert self not in self._pool._ready 
-                task = reply.task 
-                try: 
-                    func, args, kwargs = task 
-                    result = func(*args, **kwargs) 
-                except (SystemExit, KeyboardInterrupt): 
-                    break 
-                except: 
-                    reply.setexcinfo(sys.exc_info()) 
-                else: 
-                    reply.set(result) 
-                self._pool._ready[self] = True 
-        finally: 
-            del self._pool._alive[self]
-            try: 
-                del self._pool._ready[self]
-            except KeyError: 
-                pass
-
-    def send(self, task): 
-        reply = Reply(task) 
-        self._queue.put(reply) 
-        return reply 
-
-    def stop(self): 
-        self._queue.put(None) 
-
 class Reply(object): 
     _excinfo = None 
     def __init__(self, task): 
@@ -81,6 +41,44 @@
             self._excinfo = None 
             raise excinfo[0], excinfo[1], excinfo[2]
         return result 
+
+class WorkerThread(threading.Thread): 
+    def __init__(self, pool): 
+        threading.Thread.__init__(self) 
+        self._queue = Queue.Queue() 
+        self._pool = pool 
+        self.setDaemon(1) 
+
+    def run(self): 
+        try: 
+            while 1: 
+                reply = self._queue.get() 
+                assert self not in self._pool._ready 
+                task = reply.task 
+                try: 
+                    func, args, kwargs = task 
+                    result = func(*args, **kwargs) 
+                except (SystemExit, KeyboardInterrupt): 
+                    break 
+                except: 
+                    reply.setexcinfo(sys.exc_info()) 
+                else: 
+                    reply.set(result) 
+                self._pool._ready[self] = True 
+        finally: 
+            del self._pool._alive[self]
+            try: 
+                del self._pool._ready[self]
+            except KeyError: 
+                pass
+
+    def send(self, task): 
+        reply = Reply(task) 
+        self._queue.put(reply) 
+        return reply 
+
+    def stop(self): 
+        self._queue.put(SystemExit)   
         
 class WorkerPool(object): 
     _shuttingdown = False 
@@ -90,43 +88,46 @@
         self._alive = {}
 
     def dispatch(self, func, *args, **kwargs): 
+        """ return Reply object for the asynchronous dispatch 
+            of the given func(*args, **kwargs) in a 
+            separate worker thread. 
+        """
         if self._shuttingdown: 
             raise IOError("WorkerPool is already shutting down") 
         try: 
             thread, _ = self._ready.popitem() 
         except KeyError: # pop from empty list
+            if self.maxthreads and len(self._alive) >= self.maxthreads: 
+                raise IOError("can't create more than %d threads." %
+                              (self.maxthreads,))
             thread = self._newthread() 
         return thread.send((func, args, kwargs))
 
+    def _newthread(self): 
+        thread = WorkerThread(self) 
+        self._alive[thread] = True 
+        thread.start() 
+        return thread 
+
     def shutdown(self): 
         if not self._shuttingdown: 
             self._shuttingdown = True 
             for t in self._alive.keys(): 
                 t.stop() 
 
-    def _newthread(self): 
-        if self.maxthreads: 
-            if len(self._alive) >= self.maxthreads: 
-                raise IOError("cannot create more threads, "
-                              "maxthreads=%d" % (self.maxthreads,))
-        thread = WorkerThread(self) 
-        thread.start() 
-        self._alive[thread] = True 
-        return thread 
-
     def join(self, timeout=None): 
         current = threading.currentThread()
-        now = time.time() 
-        while self._alive: 
-            try: 
-                thread, _ = self._ready.popitem()
-            except KeyError: 
-                if timeout and time.time() >= now + timeout: 
-                    raise IOError("timeout waiting for threads") 
-                time.sleep(0.1) 
-            else: 
-                thread.join() 
-            
+        deadline = delta = None 
+        if timeout is not None: 
+            deadline = time.time() + timeout 
+        for thread in self._alive.keys(): 
+            if deadline: 
+                delta = deadline - time.time() 
+                if delta <= 0: 
+                    raise IOError("timeout while joining threads") 
+            thread.join(timeout=delta) 
+            if thread.isAlive(): 
+                raise IOError("timeout while joining threads") 
 
 class NamedThreadPool: 
     def __init__(self, **kw): 

Modified: py/dist/py/thread/testing/test_pool.py
==============================================================================
--- py/dist/py/thread/testing/test_pool.py	(original)
+++ py/dist/py/thread/testing/test_pool.py	Thu Apr 21 23:48:29 2005
@@ -9,8 +9,13 @@
     pool = WorkerPool()
     q = py.std.Queue.Queue()
     num = 4
+
+    def f(i): 
+        q.put(i) 
+        while q.qsize(): 
+            py.std.time.sleep(0.01) 
     for i in range(num):
-        pool.dispatch(q.put, i)
+        pool.dispatch(f, i) 
     for i in range(num):
         q.get()
     assert len(pool._alive) == 4
@@ -52,13 +57,17 @@
     finally:
         pool.shutdown()
 
-def test_shutdown_timeout():
+def test_join_timeout():
     pool = WorkerPool()
+    q = py.std.Queue.Queue()
     def f():
-        py.std.time.sleep(0.5)
-    pool.dispatch(f)
+        q.get() 
+    reply = pool.dispatch(f)
     pool.shutdown()
-    py.test.raises(IOError, pool.join, 0.2)
+    py.test.raises(IOError, pool.join, 0.01)
+    q.put(None)
+    reply.get(timeout=1.0) 
+    pool.join(timeout=0.1) 
 
 def test_pool_clean_shutdown():
     pool = WorkerPool()



More information about the pytest-commit mailing list