[py-svn] r11006 - in py/dist/py/thread: . testing
hpk at codespeak.net
hpk at codespeak.net
Thu Apr 21 23:48:29 CEST 2005
Author: hpk
Date: Thu Apr 21 23:48:29 2005
New Revision: 11006
Modified:
py/dist/py/thread/pool.py
py/dist/py/thread/testing/test_pool.py
Log:
fixing tests and improving readability of the WorkerPools
Modified: py/dist/py/thread/pool.py
==============================================================================
--- py/dist/py/thread/pool.py (original)
+++ py/dist/py/thread/pool.py Thu Apr 21 23:48:29 2005
@@ -3,46 +3,6 @@
import time
import sys
-class WorkerThread(threading.Thread):
- def __init__(self, pool):
- threading.Thread.__init__(self)
- self._queue = Queue.Queue()
- self._pool = pool
- self.setDaemon(1)
-
- def run(self):
- try:
- while 1:
- reply = self._queue.get()
- if reply is None:
- break
- assert self not in self._pool._ready
- task = reply.task
- try:
- func, args, kwargs = task
- result = func(*args, **kwargs)
- except (SystemExit, KeyboardInterrupt):
- break
- except:
- reply.setexcinfo(sys.exc_info())
- else:
- reply.set(result)
- self._pool._ready[self] = True
- finally:
- del self._pool._alive[self]
- try:
- del self._pool._ready[self]
- except KeyError:
- pass
-
- def send(self, task):
- reply = Reply(task)
- self._queue.put(reply)
- return reply
-
- def stop(self):
- self._queue.put(None)
-
class Reply(object):
_excinfo = None
def __init__(self, task):
@@ -81,6 +41,44 @@
self._excinfo = None
raise excinfo[0], excinfo[1], excinfo[2]
return result
+
+class WorkerThread(threading.Thread):
+ def __init__(self, pool):
+ threading.Thread.__init__(self)
+ self._queue = Queue.Queue()
+ self._pool = pool
+ self.setDaemon(1)
+
+ def run(self):
+ try:
+ while 1:
+ reply = self._queue.get()
+ assert self not in self._pool._ready
+ task = reply.task
+ try:
+ func, args, kwargs = task
+ result = func(*args, **kwargs)
+ except (SystemExit, KeyboardInterrupt):
+ break
+ except:
+ reply.setexcinfo(sys.exc_info())
+ else:
+ reply.set(result)
+ self._pool._ready[self] = True
+ finally:
+ del self._pool._alive[self]
+ try:
+ del self._pool._ready[self]
+ except KeyError:
+ pass
+
+ def send(self, task):
+ reply = Reply(task)
+ self._queue.put(reply)
+ return reply
+
+ def stop(self):
+ self._queue.put(SystemExit)
class WorkerPool(object):
_shuttingdown = False
@@ -90,43 +88,46 @@
self._alive = {}
def dispatch(self, func, *args, **kwargs):
+ """ return Reply object for the asynchronous dispatch
+ of the given func(*args, **kwargs) in a
+ separate worker thread.
+ """
if self._shuttingdown:
raise IOError("WorkerPool is already shutting down")
try:
thread, _ = self._ready.popitem()
except KeyError: # pop from empty list
+ if self.maxthreads and len(self._alive) >= self.maxthreads:
+ raise IOError("can't create more than %d threads." %
+ (self.maxthreads,))
thread = self._newthread()
return thread.send((func, args, kwargs))
+ def _newthread(self):
+ thread = WorkerThread(self)
+ self._alive[thread] = True
+ thread.start()
+ return thread
+
def shutdown(self):
if not self._shuttingdown:
self._shuttingdown = True
for t in self._alive.keys():
t.stop()
- def _newthread(self):
- if self.maxthreads:
- if len(self._alive) >= self.maxthreads:
- raise IOError("cannot create more threads, "
- "maxthreads=%d" % (self.maxthreads,))
- thread = WorkerThread(self)
- thread.start()
- self._alive[thread] = True
- return thread
-
def join(self, timeout=None):
current = threading.currentThread()
- now = time.time()
- while self._alive:
- try:
- thread, _ = self._ready.popitem()
- except KeyError:
- if timeout and time.time() >= now + timeout:
- raise IOError("timeout waiting for threads")
- time.sleep(0.1)
- else:
- thread.join()
-
+ deadline = delta = None
+ if timeout is not None:
+ deadline = time.time() + timeout
+ for thread in self._alive.keys():
+ if deadline:
+ delta = deadline - time.time()
+ if delta <= 0:
+ raise IOError("timeout while joining threads")
+ thread.join(timeout=delta)
+ if thread.isAlive():
+ raise IOError("timeout while joining threads")
class NamedThreadPool:
def __init__(self, **kw):
Modified: py/dist/py/thread/testing/test_pool.py
==============================================================================
--- py/dist/py/thread/testing/test_pool.py (original)
+++ py/dist/py/thread/testing/test_pool.py Thu Apr 21 23:48:29 2005
@@ -9,8 +9,13 @@
pool = WorkerPool()
q = py.std.Queue.Queue()
num = 4
+
+ def f(i):
+ q.put(i)
+ while q.qsize():
+ py.std.time.sleep(0.01)
for i in range(num):
- pool.dispatch(q.put, i)
+ pool.dispatch(f, i)
for i in range(num):
q.get()
assert len(pool._alive) == 4
@@ -52,13 +57,17 @@
finally:
pool.shutdown()
-def test_shutdown_timeout():
+def test_join_timeout():
pool = WorkerPool()
+ q = py.std.Queue.Queue()
def f():
- py.std.time.sleep(0.5)
- pool.dispatch(f)
+ q.get()
+ reply = pool.dispatch(f)
pool.shutdown()
- py.test.raises(IOError, pool.join, 0.2)
+ py.test.raises(IOError, pool.join, 0.01)
+ q.put(None)
+ reply.get(timeout=1.0)
+ pool.join(timeout=0.1)
def test_pool_clean_shutdown():
pool = WorkerPool()
More information about the pytest-commit
mailing list