[py-svn] r10632 - in py/branch/py-collect/execnet: . testing
hpk at codespeak.net
hpk at codespeak.net
Thu Apr 14 19:12:33 CEST 2005
Author: hpk
Date: Thu Apr 14 19:12:33 2005
New Revision: 10632
Modified:
py/branch/py-collect/execnet/gateway.py
py/branch/py-collect/execnet/testing/test_threadpool.py
Log:
cleanup of startup/shutdown for workerthreads
Modified: py/branch/py-collect/execnet/gateway.py
==============================================================================
--- py/branch/py-collect/execnet/gateway.py (original)
+++ py/branch/py-collect/execnet/gateway.py Thu Apr 14 19:12:33 2005
@@ -42,18 +42,23 @@
self.setDaemon(1)
def run(self):
- while 1:
- task = self._queue.get()
- assert self not in self._pool._ready
- if task is None:
- break
- try:
- func, args, kwargs = task
- func(*args, **kwargs)
- except:
- import traceback
- traceback.print_exc()
- self._pool._ready.append(self)
+ try:
+ while 1:
+ task = self._queue.get()
+ assert self not in self._pool._ready
+ if task is None:
+ break
+ try:
+ func, args, kwargs = task
+ func(*args, **kwargs)
+ except (SystemExit, KeyboardInterrupt):
+ break
+ except:
+ import traceback
+ traceback.print_exc()
+ self._pool._ready.append(self)
+ finally:
+ del self._pool._alive[self]
def handle(self, task):
self._queue.put(task)
@@ -67,6 +72,7 @@
self.maxthreads = maxthreads
self._numthreads = 0
self._ready = []
+ self._alive = {}
def dispatch(self, func, *args, **kwargs):
if self._shutdown:
@@ -85,7 +91,7 @@
if not self._shutdown:
self._shutdown = True
now = time.time()
- while self._numthreads:
+ while self._alive:
try:
thread = self._ready.pop()
except IndexError:
@@ -93,18 +99,24 @@
raise IOError("Timeout: could not shut down WorkerPool")
time.sleep(0.1)
else:
- self._numthreads -= 1
+ thread.stop()
def _newthread(self):
if self.maxthreads:
- if self._numthreads >= self.maxthreads:
+ if len(self._alive) >= self.maxthreads:
raise IOError("cannot create more threads, "
"maxthreads=%d" % (self.maxthreads,))
thread = WorkerThread(self)
- self._numthreads += 1
thread.start()
+ self._alive[thread] = True
return thread
+ def join(self):
+ current = threading.currentThread()
+ for x in self._alive.keys():
+ if current != x:
+ x.join()
+
class NamedThreadPool:
def __init__(self, **kw):
self._namedthreads = {}
@@ -161,10 +173,10 @@
_gateways.append(self)
def __repr__(self):
- R = len(self.pool.getstarted('receiver'))
- S = len(self.pool.getstarted('sender'))
+ R = len(self.pool.getstarted('receiver')) and "receiving" or "not receiving"
+ S = len(self.pool.getstarted('sender')) and "sending" or "not sending"
i = len(self.channelfactory.values())
- return "<%s %d/%d (%d active channels)>" %(self.__class__.__name__,
+ return "<%s %s/%s (%d active channels)>" %(self.__class__.__name__,
R, S, i)
def _stopexec(self):
@@ -193,6 +205,7 @@
if x != current:
self.trace("joining %s" % x)
x.join()
+ self._execpool.join()
self.trace("joining threads finished, current %r" % current)
def trace(self, *args):
Modified: py/branch/py-collect/execnet/testing/test_threadpool.py
==============================================================================
--- py/branch/py-collect/execnet/testing/test_threadpool.py (original)
+++ py/branch/py-collect/execnet/testing/test_threadpool.py Thu Apr 14 19:12:33 2005
@@ -4,16 +4,17 @@
def test_some():
pool = WorkerPool()
- l = []
- try:
- pool.dispatch(l.append, 1)
- pool.dispatch(l.append, 2)
- pool.dispatch(l.append, 3)
- pool.dispatch(l.append, 4)
- finally:
- pool.shutdown()
- assert len(pool._ready) == pool._numthreads
- assert len(l) == 4
+ q = py.std.Queue.Queue()
+ num = 4
+ for i in range(num):
+ pool.dispatch(q.put, i)
+ for i in range(num):
+ q.get()
+ assert len(pool._alive) == 4
+ assert len(pool._ready) == 4
+ pool.shutdown()
+ assert len(pool._alive) == 0
+ assert len(pool._ready) == 0
def test_maxthreads():
pool = WorkerPool(maxthreads=1)
@@ -28,6 +29,16 @@
def test_shutdown_timeout():
pool = WorkerPool()
def f():
- py.std.time.sleep(1.5)
+ py.std.time.sleep(0.5)
pool.dispatch(f)
py.test.raises(IOError, pool.shutdown, 0.2)
+
+def test_pool_clean_shutdown():
+ pool = WorkerPool()
+ def f():
+ pass
+ pool.dispatch(f)
+ pool.dispatch(f)
+ pool.shutdown()
+ assert not pool._alive
+ assert not pool._ready
More information about the pytest-commit
mailing list