[py-svn] r10680 - in py/branch/py-collect: . execnet execnet/testing test/testing thread thread/testing
hpk at codespeak.net
hpk at codespeak.net
Fri Apr 15 17:37:15 CEST 2005
Author: hpk
Date: Fri Apr 15 17:37:15 2005
New Revision: 10680
Added:
py/branch/py-collect/thread/ (props changed)
py/branch/py-collect/thread/__init__.py
- copied unchanged from r10656, py/branch/py-collect/test/__init__.py
py/branch/py-collect/thread/io.py
py/branch/py-collect/thread/pool.py
- copied, changed from r10656, py/branch/py-collect/execnet/gateway.py
py/branch/py-collect/thread/testing/ (props changed)
py/branch/py-collect/thread/testing/__init__.py
- copied unchanged from r10656, py/branch/py-collect/test/__init__.py
py/branch/py-collect/thread/testing/test_threadpool.py (contents, props changed)
- copied, changed from r10656, py/branch/py-collect/execnet/testing/test_threadpool.py
Removed:
py/branch/py-collect/execnet/testing/test_threadpool.py
Modified:
py/branch/py-collect/__init__.py
py/branch/py-collect/execnet/gateway.py
py/branch/py-collect/execnet/register.py
py/branch/py-collect/execnet/testing/test_gateway.py
py/branch/py-collect/test/testing/test_session.py
Log:
refactoring thread-related stuff from py.execnet to
a new namespace: py.thread.
However, i am going to change it to py._thread for the time
being because the design of the supplied helper-classes is
somewhat floating. Samuele Pedroni pointed me to the way Java
1.5 organizes Threads which might be an interesting source
(although heavily convoluted to read :-) for design
considerations.
Modified: py/branch/py-collect/__init__.py
==============================================================================
--- py/branch/py-collect/__init__.py (original)
+++ py/branch/py-collect/__init__.py Fri Apr 15 17:37:15 2005
@@ -26,6 +26,11 @@
'test.Item' : ('./test/item.py', 'Item'),
'test.Function' : ('./test/item.py', 'Function'),
+ # thread related API
+ 'thread.WorkerPool' : ('./thread/pool.py', 'WorkerPool'),
+ 'thread.NamedThreadPool' : ('./thread/pool.py', 'NamedThreadPool'),
+ 'thread.ThreadOut' : ('./thread/io.py', 'ThreadOut'),
+
# hook into the top-level standard library
'std' : ('./misc/std.py', 'std'),
Modified: py/branch/py-collect/execnet/gateway.py
==============================================================================
--- py/branch/py-collect/execnet/gateway.py (original)
+++ py/branch/py-collect/execnet/gateway.py Fri Apr 15 17:37:15 2005
@@ -8,11 +8,14 @@
# XXX the following line should not be here
-g = globals()
-if 'Message' not in g:
+if 'Message' not in globals():
+ import py
from py.code import Source
from py.__impl__.execnet.channel import ChannelFactory, Channel
from py.__impl__.execnet.message import Message
+ ThreadOut = py.thread.ThreadOut
+ WorkerPool = py.thread.WorkerPool
+ NamedThreadPool = py.thread.NamedThreadPool
assert Message and ChannelFactory, "Import/Configuration Error"
@@ -33,200 +36,10 @@
def __repr__(self):
return "%s: %s" %(self.__class__.__name__, self.formatted)
-class WorkerThread(threading.Thread):
- def __init__(self, pool):
- super(WorkerThread, self).__init__()
- self._queue = Queue.Queue()
- self._pool = pool
- self.setDaemon(1)
-
- def run(self):
- try:
- while 1:
- task = self._queue.get()
- assert self not in self._pool._ready
- if task is None:
- break
- try:
- func, args, kwargs = task
- func(*args, **kwargs)
- except (SystemExit, KeyboardInterrupt):
- break
- except:
- import traceback
- traceback.print_exc()
- self._pool._ready.append(self)
- finally:
- del self._pool._alive[self]
-
- def handle(self, task):
- self._queue.put(task)
-
- def stop(self):
- self._queue.put(None)
-
-class WorkerPool(object):
- _shutdown = False
- def __init__(self, maxthreads=None):
- self.maxthreads = maxthreads
- self._numthreads = 0
- self._ready = []
- self._alive = {}
-
- def dispatch(self, func, *args, **kwargs):
- if self._shutdown:
- raise IOError("WorkerPool is already shutting down")
- task = (func, args, kwargs)
- try:
- thread = self._ready.pop()
- except IndexError: # pop from empty list
- thread = self._newthread()
- thread.handle(task)
-
- def __del__(self):
- self.shutdown()
-
- def shutdown(self, timeout=1.0):
- if not self._shutdown:
- self._shutdown = True
- now = time.time()
- while self._alive:
- try:
- thread = self._ready.pop()
- except IndexError:
- if now + timeout < time.time():
- raise IOError("Timeout: could not shut down WorkerPool")
- time.sleep(0.1)
- else:
- thread.stop()
-
- def _newthread(self):
- if self.maxthreads:
- if len(self._alive) >= self.maxthreads:
- raise IOError("cannot create more threads, "
- "maxthreads=%d" % (self.maxthreads,))
- thread = WorkerThread(self)
- thread.start()
- self._alive[thread] = True
- return thread
-
- def join(self):
- current = threading.currentThread()
- for x in self._alive.keys():
- if current != x:
- x.join()
-
-class NamedThreadPool:
- def __init__(self, **kw):
- self._namedthreads = {}
- for name, value in kw.items():
- self.start(name, value)
-
- def __repr__(self):
- return "<NamedThreadPool %r>" %(self._namedthreads)
-
- def get(self, name=None):
- if name is None:
- l = []
- for x in self._namedthreads.values():
- l.extend(x)
- return l
- else:
- return self._namedthreads.get(name, [])
-
- def getstarted(self, name=None):
- return [t for t in self.get(name) if t.isAlive()]
-
- def prunestopped(self, name=None):
- if name is None:
- for name in self.names():
- self.prunestopped(name)
- else:
- self._namedthreads[name] = self.getstarted(name)
-
- def names(self):
- return self._namedthreads.keys()
-
- def start(self, name, func):
- l = self._namedthreads.setdefault(name, [])
- thread = threading.Thread(name="%s%d" % (name, len(l)),
- target=func)
- thread.start()
- l.append(thread)
-
-class ThreadOut(object):
- def __new__(cls, obj, attrname):
- """ Divert file output to per-thread writefuncs.
- the given obj and attrname describe the destination
- of the file.
- """
- current = getattr(obj, attrname)
- if isinstance(current, cls):
- current._used += 1
- return current
- self = object.__new__(cls)
- self._tid2out = {}
- self._used = 1
- self._oldout = getattr(obj, attrname)
- self._defaultwriter = self._oldout.write
- self._address = (obj, attrname)
- setattr(obj, attrname, self)
- return self
-
- def setdefaultwriter(self, writefunc):
- self._defaultwriter = writefunc
-
- def resetdefault(self):
- self._defaultwriter = self._oldout.write
-
- def softspace():
- def fget(self):
- return self._get()[0]
- def fset(self, value):
- self._get()[0] = value
- return property(fget, fset, None, "software attribute")
- softspace = softspace()
-
- def deinstall(self):
- self._used -= 1
- x = self._used
- if x <= 0:
- obj, attrname = self._address
- setattr(obj, attrname, self._oldout)
-
- def setwritefunc(self, writefunc, tid=None):
- assert callable(writefunc)
- if tid is None:
- tid = thread.get_ident()
- self._tid2out[tid] = [0, writefunc]
-
- def delwritefunc(self, tid=None, ignoremissing=True):
- if tid is None:
- tid = thread.get_ident()
- try:
- del self._tid2out[tid]
- except KeyError:
- if not ignoremissing:
- raise
-
- def _get(self):
- tid = thread.get_ident()
- try:
- return self._tid2out[tid]
- except KeyError:
- return getattr(self._defaultwriter, 'softspace', 0), self._defaultwriter
-
- def write(self, data):
- softspace, out = self._get()
- out(data)
-
- def flush(self):
- pass
-
class Gateway(object):
num_worker_threads = 2
RemoteError = RemoteError
- ThreadOut = ThreadOut
+ ThreadOut = ThreadOut
def __init__(self, io, startcount=2, maxthreads=None):
self._execpool = WorkerPool()
Modified: py/branch/py-collect/execnet/register.py
==============================================================================
--- py/branch/py-collect/execnet/register.py (original)
+++ py/branch/py-collect/execnet/register.py Fri Apr 15 17:37:15 2005
@@ -4,13 +4,33 @@
from py.magic import autopath ; mypath = autopath()
import py
-from py.__impl__.execnet import inputoutput, gateway, channel, message
+
+# the list of modules that must be send to the other side
+# for bootstrapping gateways
+# XXX we want to have a cleaner bootstrap mechanism
+# by making sure early that we have the py lib available
+# in a sufficient version
+
+startup_modules = [
+ 'py.__impl__.execnet.inputoutput',
+ 'py.__impl__.execnet.gateway',
+ 'py.__impl__.execnet.channel',
+ 'py.__impl__.execnet.message',
+ 'py.__impl__.thread.io',
+ 'py.__impl__.thread.pool',
+]
+
+def getsource(dottedname):
+ mod = __import__(dottedname, None, None, ['last'])
+ return inspect.getsource(mod)
+
+from py.__impl__.execnet import inputoutput, gateway
class InstallableGateway(gateway.Gateway):
""" initialize gateways on both sides of a inputoutput object. """
def __init__(self, io):
self.remote_bootstrap_gateway(io)
- gateway.Gateway.__init__(self, io=io, startcount=1)
+ super(InstallableGateway, self).__init__(io=io, startcount=1)
def remote_bootstrap_gateway(self, io, extra=''):
""" return Gateway with a asynchronously remotely
@@ -20,15 +40,9 @@
gateway starts with odd numbers. This allows to
uniquely identify channels across both sides.
"""
- bootstrap = [
- extra,
- inspect.getsource(inputoutput),
- inspect.getsource(message),
- inspect.getsource(channel),
- inspect.getsource(gateway),
- io.server_stmt,
- "Gateway(io=io, startcount=2).join()",
- ]
+ bootstrap = ["we_are_remote=True", extra]
+ bootstrap += [getsource(x) for x in startup_modules]
+ bootstrap += [io.server_stmt, "Gateway(io=io, startcount=2).join()",]
source = "\n".join(bootstrap)
self.trace("sending gateway bootstrap code")
io.write('%r\n' % source)
Modified: py/branch/py-collect/execnet/testing/test_gateway.py
==============================================================================
--- py/branch/py-collect/execnet/testing/test_gateway.py (original)
+++ py/branch/py-collect/execnet/testing/test_gateway.py Fri Apr 15 17:37:15 2005
@@ -5,6 +5,24 @@
mypath = py.magic.autopath()
from StringIO import StringIO
+from py.__impl__.execnet.register import startup_modules, getsource
+
+def test_getsource_import_modules():
+ for dottedname in startup_modules:
+ yield getsource, dottedname
+
+def test_getsource_no_colision():
+ seen = {}
+ for dottedname in startup_modules:
+ mod = __import__(dottedname, None, None, ['last'])
+ for name, value in vars(mod).items():
+ if py.std.inspect.isclass(value):
+ if name in seen:
+ olddottedname, oldval = seen[name]
+ if oldval is not value:
+ py.test.fail("duplicate class %r in %s and %s" %
+ (name, dottedname, olddottedname))
+ seen[name] = (dottedname, value)
class TestMessage:
def test_wire_protocol(self):
@@ -50,7 +68,6 @@
channel = self.fac.new()
py.test.raises(IOError, channel.waitclose, timeout=0.01)
-
class PopenGatewayTestSetup:
def setup_class(cls):
cls.gw = py.execnet.PopenGateway()
Deleted: /py/branch/py-collect/execnet/testing/test_threadpool.py
==============================================================================
--- /py/branch/py-collect/execnet/testing/test_threadpool.py Fri Apr 15 17:37:15 2005
+++ (empty file)
@@ -1,104 +0,0 @@
-
-from py.__impl__.execnet.gateway import WorkerPool, ThreadOut
-import py
-import sys
-
-def test_some():
- pool = WorkerPool()
- q = py.std.Queue.Queue()
- num = 4
- for i in range(num):
- pool.dispatch(q.put, i)
- for i in range(num):
- q.get()
- assert len(pool._alive) == 4
- pool.shutdown()
- assert len(pool._alive) == 0
- assert len(pool._ready) == 0
-
-def test_maxthreads():
- pool = WorkerPool(maxthreads=1)
- def f():
- py.std.time.sleep(0.5)
- try:
- pool.dispatch(f)
- py.test.raises(IOError, pool.dispatch, f)
- finally:
- pool.shutdown()
-
-def test_shutdown_timeout():
- pool = WorkerPool()
- def f():
- py.std.time.sleep(0.5)
- pool.dispatch(f)
- py.test.raises(IOError, pool.shutdown, 0.2)
-
-def test_pool_clean_shutdown():
- pool = WorkerPool()
- def f():
- pass
- pool.dispatch(f)
- pool.dispatch(f)
- pool.shutdown()
- assert not pool._alive
- assert not pool._ready
-
-def test_threadout_install_deinstall():
- old = sys.stdout
- out = ThreadOut(sys, 'stdout')
- out.deinstall()
- assert old == sys.stdout
-
-class TestThreadOut:
- def setup_method(self, method):
- self.out = ThreadOut(sys, 'stdout')
- def teardown_method(self, method):
- self.out.deinstall()
-
- def test_threadout_one(self):
- l = []
- self.out.setwritefunc(l.append)
- print 42,13,
- x = l.pop(0)
- assert x == '42'
- x = l.pop(0)
- assert x == ' '
- x = l.pop(0)
- assert x == '13'
-
-
- def test_threadout_multi_and_default(self):
- num = 3
- defaults = []
- def f(l):
- self.out.setwritefunc(l.append)
- print id(l),
- self.out.delwritefunc()
- print 1
-
- self.out.setdefaultwriter(defaults.append)
- pool = WorkerPool()
- listlist = []
- for x in range(num):
- l = []
- listlist.append(l)
- pool.dispatch(f, l)
- pool.shutdown()
- for name, value in self.out.__dict__.items():
- print >>sys.stderr, "%s: %s" %(name, value)
- for i in range(num):
- item = listlist[i]
- assert item ==[str(id(item))]
- assert not self.out._tid2out
- assert defaults
- expect = ['1' for x in range(num)]
- defaults = [x for x in defaults if x.strip()]
- assert defaults == expect
-
- def test_threadout_nested(self):
- # we want ThreadOuts to coexist
- last = sys.stdout
- out = ThreadOut(sys, 'stdout')
- assert last == sys.stdout
- out.deinstall()
- assert last == sys.stdout
Modified: py/branch/py-collect/test/testing/test_session.py
==============================================================================
--- py/branch/py-collect/test/testing/test_session.py (original)
+++ py/branch/py-collect/test/testing/test_session.py Fri Apr 15 17:37:15 2005
@@ -143,3 +143,28 @@
assert item.name == 'test_4'
names = item.listnames()
assert names == ['ordertest', 'test_orderofexecution.py', 'Testmygroup', '()', 'test_4']
+
+class TestRemoteSession:
+
+ def XXXtest_basic(self, method):
+ o = tmpdir.ensure('basicremote', dir=1)
+ tfile = o.join('test_remote.py')
+ tfile.write(py.code.Source("""
+ def test_1():
+ assert 1 == 0
+ """))
+ config, args = py.test.Config.parse(['--looponfailing'])
+ assert config._remote
+ cls = config.getsessionclass()
+ out = py.std.Queue.Queue()
+ session = cls(config, out.put)
+ pool = WorkerPool()
+ reply = pool.dispatch(session.main, [o])
+ session.main([o])
+ out = self.file.getvalue()
+ #print out
+ l = []
+ self.session = py.test.TerminalSession(config, file=self.file)
+ #print >>f, "session %r is setup for %r" %(self.session, method)
+ #f.flush()
+ #print "session is setup", self.session
Added: py/branch/py-collect/thread/io.py
==============================================================================
--- (empty file)
+++ py/branch/py-collect/thread/io.py Fri Apr 15 17:37:15 2005
@@ -0,0 +1,72 @@
+
+import thread
+
+class ThreadOut(object):
+ def __new__(cls, obj, attrname):
+ """ Divert file output to per-thread writefuncs.
+ the given obj and attrname describe the destination
+ of the file.
+ """
+ current = getattr(obj, attrname)
+ if isinstance(current, cls):
+ current._used += 1
+ return current
+ self = object.__new__(cls)
+ self._tid2out = {}
+ self._used = 1
+ self._oldout = getattr(obj, attrname)
+ self._defaultwriter = self._oldout.write
+ self._address = (obj, attrname)
+ setattr(obj, attrname, self)
+ return self
+
+ def setdefaultwriter(self, writefunc):
+ self._defaultwriter = writefunc
+
+ def resetdefault(self):
+ self._defaultwriter = self._oldout.write
+
+ def softspace():
+ def fget(self):
+ return self._get()[0]
+ def fset(self, value):
+ self._get()[0] = value
+ return property(fget, fset, None, "software attribute")
+ softspace = softspace()
+
+ def deinstall(self):
+ self._used -= 1
+ x = self._used
+ if x <= 0:
+ obj, attrname = self._address
+ setattr(obj, attrname, self._oldout)
+
+ def setwritefunc(self, writefunc, tid=None):
+ assert callable(writefunc)
+ if tid is None:
+ tid = thread.get_ident()
+ self._tid2out[tid] = [0, writefunc]
+
+ def delwritefunc(self, tid=None, ignoremissing=True):
+ if tid is None:
+ tid = thread.get_ident()
+ try:
+ del self._tid2out[tid]
+ except KeyError:
+ if not ignoremissing:
+ raise
+
+ def _get(self):
+ tid = thread.get_ident()
+ try:
+ return self._tid2out[tid]
+ except KeyError:
+ return getattr(self._defaultwriter, 'softspace', 0), self._defaultwriter
+
+ def write(self, data):
+ softspace, out = self._get()
+ out(data)
+
+ def flush(self):
+ pass
+
Copied: py/branch/py-collect/thread/pool.py (from r10656, py/branch/py-collect/execnet/gateway.py)
==============================================================================
--- py/branch/py-collect/execnet/gateway.py (original)
+++ py/branch/py-collect/thread/pool.py Fri Apr 15 17:37:15 2005
@@ -1,37 +1,6 @@
-import sys
-import os
-import thread, threading
-import Queue
-import traceback
-import atexit
-import time
-
-
-# XXX the following line should not be here
-g = globals()
-if 'Message' not in g:
- from py.code import Source
- from py.__impl__.execnet.channel import ChannelFactory, Channel
- from py.__impl__.execnet.message import Message
-
-assert Message and ChannelFactory, "Import/Configuration Error"
-
-import os
-debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid() , 'wa')
-
-sysex = (KeyboardInterrupt, SystemExit)
-
-class RemoteError(EOFError):
- """ Contains an Exceptions from the other side. """
- def __init__(self, formatted):
- self.formatted = formatted
- EOFError.__init__(self)
-
- def __str__(self):
- return self.formatted
-
- def __repr__(self):
- return "%s: %s" %(self.__class__.__name__, self.formatted)
+import Queue
+import threading
+import time
class WorkerThread(threading.Thread):
def __init__(self, pool):
@@ -83,9 +52,6 @@
thread = self._newthread()
thread.handle(task)
- def __del__(self):
- self.shutdown()
-
def shutdown(self, timeout=1.0):
if not self._shutdown:
self._shutdown = True
@@ -154,319 +120,3 @@
thread.start()
l.append(thread)
-class ThreadOut(object):
- def __new__(cls, obj, attrname):
- """ Divert file output to per-thread writefuncs.
- the given obj and attrname describe the destination
- of the file.
- """
- current = getattr(obj, attrname)
- if isinstance(current, cls):
- current._used += 1
- return current
- self = object.__new__(cls)
- self._tid2out = {}
- self._used = 1
- self._oldout = getattr(obj, attrname)
- self._defaultwriter = self._oldout.write
- self._address = (obj, attrname)
- setattr(obj, attrname, self)
- return self
-
- def setdefaultwriter(self, writefunc):
- self._defaultwriter = writefunc
-
- def resetdefault(self):
- self._defaultwriter = self._oldout.write
-
- def softspace():
- def fget(self):
- return self._get()[0]
- def fset(self, value):
- self._get()[0] = value
- return property(fget, fset, None, "software attribute")
- softspace = softspace()
-
- def deinstall(self):
- self._used -= 1
- x = self._used
- if x <= 0:
- obj, attrname = self._address
- setattr(obj, attrname, self._oldout)
-
- def setwritefunc(self, writefunc, tid=None):
- assert callable(writefunc)
- if tid is None:
- tid = thread.get_ident()
- self._tid2out[tid] = [0, writefunc]
-
- def delwritefunc(self, tid=None, ignoremissing=True):
- if tid is None:
- tid = thread.get_ident()
- try:
- del self._tid2out[tid]
- except KeyError:
- if not ignoremissing:
- raise
-
- def _get(self):
- tid = thread.get_ident()
- try:
- return self._tid2out[tid]
- except KeyError:
- return getattr(self._defaultwriter, 'softspace', 0), self._defaultwriter
-
- def write(self, data):
- softspace, out = self._get()
- out(data)
-
- def flush(self):
- pass
-
-class Gateway(object):
- num_worker_threads = 2
- RemoteError = RemoteError
- ThreadOut = ThreadOut
-
- def __init__(self, io, startcount=2, maxthreads=None):
- self._execpool = WorkerPool()
- self.running = True
- self.io = io
- self._outgoing = Queue.Queue()
- self.channelfactory = ChannelFactory(self, startcount)
- self._exitlock = threading.Lock()
- self.pool = NamedThreadPool(receiver = self.thread_receiver,
- sender = self.thread_sender)
- if not _gateways:
- atexit.register(cleanup_atexit)
- _gateways.append(self)
-
- def __repr__(self):
- R = len(self.pool.getstarted('receiver')) and "receiving" or "not receiving"
- S = len(self.pool.getstarted('sender')) and "sending" or "not sending"
- i = len(self.channelfactory.values())
- return "<%s %s/%s (%d active channels)>" %(self.__class__.__name__,
- R, S, i)
-
- def _stopexec(self):
- #self.pool.prunestopped()
- self._execpool.shutdown()
-
- def exit(self):
- # note that threads may still be scheduled to start
- # during our execution!
- self._exitlock.acquire()
- try:
- if self.running:
- self.running = False
- self._stopexec()
- if self.pool.getstarted('sender'):
- self._outgoing.put(Message.EXIT_GATEWAY())
- self.trace("exit procedure triggered, pid %d, gateway %r" % (
- os.getpid(), self))
- _gateways.remove(self)
- finally:
- self._exitlock.release()
-
- def join(self):
- current = threading.currentThread()
- for x in self.pool.getstarted():
- if x != current:
- self.trace("joining %s" % x)
- x.join()
- self._execpool.join()
- self.trace("joining threads finished, current %r" % current)
-
- def trace(self, *args):
- if debug:
- try:
- l = "\n".join(args).split(os.linesep)
- id = getid(self)
- for x in l:
- print >>debug, x
- debug.flush()
- except sysex:
- raise
- except:
- traceback.print_exc()
- def traceex(self, excinfo):
- l = traceback.format_exception(*excinfo)
- errortext = "".join(l)
- self.trace(errortext)
-
- def thread_receiver(self):
- """ thread to read and handle Messages half-sync-half-async. """
- try:
- while 1:
- try:
- msg = Message.readfrom(self.io)
- self.trace("received <- %r" % msg)
- msg.received(self)
- except sysex:
- raise
- except:
- self.traceex(sys.exc_info())
- break
- finally:
- self.trace('leaving %r' % threading.currentThread())
-
- def thread_sender(self):
- """ thread to send Messages over the wire. """
- try:
- while 1:
- msg = self._outgoing.get()
- try:
- msg.writeto(self.io)
- except:
- excinfo = sys.exc_info()
- self.traceex(excinfo)
- msg.post_sent(self, excinfo)
- raise
- else:
- self.trace('sent -> %r' % msg)
- msg.post_sent(self)
- finally:
- self.trace('leaving %r' % threading.currentThread())
-
- def _redirect_thread_output(self, outid, errid):
- l = []
- for name, id in ('stdout', outid), ('stderr', errid):
- if id:
- channel = self._makechannel(outid)
- out = ThreadOut(sys, name)
- out.setwritefunc(channel.send)
- l.append((out, channel))
- def close():
- for out, channel in l:
- out.delwritefunc()
- channel.close()
- return close
-
- def _makechannel(self, newid):
- newchannel = Channel(self, newid)
- self.channelfactory[newid] = newchannel
- return newchannel
-
- def thread_executor(self, channel, (source, outid, errid)):
- """ worker thread to execute source objects from the execution queue. """
- try:
- loc = { 'channel' : channel }
- self.trace("execution starts:", repr(source)[:50])
- close = self._redirect_thread_output(outid, errid)
- try:
- co = compile(source+'\n', '', 'exec', 4096)
- exec co in loc
- finally:
- close()
- self.trace("execution finished:", repr(source)[:50])
- except (KeyboardInterrupt, SystemExit):
- raise
- except:
- excinfo = sys.exc_info()
- l = traceback.format_exception(*excinfo)
- errortext = "".join(l)
- channel.close(errortext)
- self.trace(errortext)
- else:
- channel.close()
-
- def _scheduleexec(self, channel, sourcetask):
- self.trace("dispatching exec")
- self._execpool.dispatch(self.thread_executor, channel, sourcetask)
-
- def _dispatchcallback(self, callback, data):
- # XXX this should run in a separate thread because
- # we might otherwise block the receiver thread
- # where we get called from
- callback(data)
-
- def _remote_exec(self, channel, source, stdout=None, stderr=None):
- try:
- source = str(Source(source))
- except NameError:
- try:
- import py
- source = str(py.code.Source(source))
- except ImportError:
- pass
- outid = self._redirectchannelid(stdout)
- errid = self._redirectchannelid(stderr)
- self._outgoing.put(Message.CHANNEL_OPEN(channel.id,
- (source, outid, errid)))
-
- def _redirectchannelid(self, callback):
- if callback is None:
- return
- if hasattr(callback, 'write'):
- callback = callback.write
- assert callable(callback)
- chan = self.newchannel()
- chan.setcallback(callback)
- return chan.id
-
- # _____________________________________________________________________
- #
- # High Level Interface
- # _____________________________________________________________________
- #
- def newchannel(self):
- """ return new channel object. """
- return self.channelfactory.new()
-
- def remote_exec(self, source, stdout=None, stderr=None):
- """ return channel object for communicating with the asynchronously
- executing 'source' code which will have a corresponding 'channel'
- object in its executing namespace. If a channel object is not
- provided a new channel will be created. If a channel is provided
- is will be returned as well.
- """
- channel = self.newchannel()
- channel.remote_exec(source, stdout=stdout, stderr=stderr)
- return channel
-
- def remote_redirect(self, stdout=None, stderr=None):
- """ return a handle representing a redirection of of remote
- end's stdout to a local file object. with handle.close()
- the redirection will be reverted.
- """
- clist = []
- for name, out in ('stdout', stdout), ('stderr', stderr):
- if out:
- outchannel = self.newchannel()
- outchannel.setcallback(getattr(out, 'write', out))
- channel = self.remote_exec("""
- import sys
- outchannel = channel.receive()
- outchannel.gateway.ThreadOut(sys, %r).setdefaultwriter(outchannel.send)
- """ % name)
- channel.send(outchannel)
- clist.append(channel)
- for c in clist:
- c.waitclose(1.0)
- class Handle:
- def close(_):
- for name, out in ('stdout', stdout), ('stderr', stderr):
- if out:
- c = self.remote_exec("""
- import sys
- channel.gateway.ThreadOut(sys, %r).resetdefault()
- """ % name)
- c.waitclose(1.0)
- return Handle()
-
-def getid(gw, cache={}):
- name = gw.__class__.__name__
- try:
- return cache.setdefault(name, {})[id(gw)]
- except KeyError:
- cache[name][id(gw)] = x = "%s:%s.%d" %(os.getpid(), gw.__class__.__name__, len(cache[name]))
- return x
-
-_gateways = []
-def cleanup_atexit():
- if debug:
- print >>debug, "="*20 + "cleaning up" + "=" * 20
- debug.flush()
- while _gateways:
- x = _gateways[-1]
- x.exit()
Copied: py/branch/py-collect/thread/testing/test_threadpool.py (from r10656, py/branch/py-collect/execnet/testing/test_threadpool.py)
==============================================================================
--- py/branch/py-collect/execnet/testing/test_threadpool.py (original)
+++ py/branch/py-collect/thread/testing/test_threadpool.py Fri Apr 15 17:37:15 2005
@@ -1,8 +1,10 @@
-from py.__impl__.execnet.gateway import WorkerPool, ThreadOut
import py
import sys
+WorkerPool = py.thread.WorkerPool
+ThreadOut = py.thread.ThreadOut
+
def test_some():
pool = WorkerPool()
q = py.std.Queue.Queue()
More information about the pytest-commit
mailing list