[py-svn] r39992 - py/branch/lessthread/py/execnet
hpk at codespeak.net
hpk at codespeak.net
Tue Mar 6 18:35:03 CET 2007
Author: hpk
Date: Tue Mar 6 18:35:02 2007
New Revision: 39992
Modified:
py/branch/lessthread/py/execnet/gateway.py
py/branch/lessthread/py/execnet/register.py
Log:
removing execution threads by default
Modified: py/branch/lessthread/py/execnet/gateway.py
==============================================================================
--- py/branch/lessthread/py/execnet/gateway.py (original)
+++ py/branch/lessthread/py/execnet/gateway.py Tue Mar 6 18:35:02 2007
@@ -22,7 +22,7 @@
from py.__.execnet.channel import ChannelFactory, Channel
from py.__.execnet.message import Message
ThreadOut = py._thread.ThreadOut
- WorkerPool = py._thread.WorkerPool
+ WorkerPool = py._thread.WorkerPool
import os
debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid() , 'wa')
@@ -31,20 +31,29 @@
class Gateway(object):
_ThreadOut = ThreadOut
+ _workerpool = None
remoteaddress = ""
def __init__(self, io, execthreads=None, _startcount=2):
""" initialize core gateway, using the given
inputoutput object and 'execthreads' execution
- threads.
+ threads. if 'execthreads' is -1, no execution
+ will be allowed on this side. If it is None,
+ requests will be queued and _servemain() may
+ be used to execute them.
"""
global registered_cleanup, _activegateways
- self._execpool = WorkerPool(maxthreads=execthreads)
self._io = io
self._channelfactory = ChannelFactory(self, _startcount)
if not registered_cleanup:
atexit.register(cleanup_atexit)
registered_cleanup = True
_activegateways[self] = True
+ if execthreads != -1:
+ if execthreads is not None:
+ self._workerpool = WorkerPool(maxthreads=execthreads)
+ else:
+ self._requestqueue = Queue.Queue()
+
self._receiverthread = threading.Thread(name="receiver",
target=self._thread_receiver)
self._receiverthread.setDaemon(0)
@@ -110,7 +119,8 @@
self._traceex(exc_info())
break
finally:
- self._send(None)
+ self._stopexec()
+ self._stopsend()
self._channelfactory._finished_receiving()
self._trace('leaving %r' % threading.currentThread())
@@ -143,10 +153,31 @@
channel.close()
return close
- def _thread_executor(self, channel, (source, outid, errid)):
- """ worker thread to execute source objects from the execution queue. """
+ def _local_schedulexec(self, channel, sourcetask):
+ if self._workerpool:
+ self._workerpool.dispatch(self._executetask, channel, sourcetask)
+ elif hasattr(self, '_requestqueue'):
+ self._requestqueue.put((channel, sourcetask))
+
+ def _servemain(self, joining=True):
from sys import exc_info
try:
+ while 1:
+ item = self._requestqueue.get()
+ if item is None:
+ self._stopsend()
+ break
+ self._executetask(item) # could be done in an exec thread
+ finally:
+ self._trace("_servemain finished")
+ if self.joining:
+ self.join()
+
+ def _executetask(self, item):
+ """ execute channel/source items. """
+ from sys import exc_info
+ channel, (source, outid, errid) = item
+ try:
loc = { 'channel' : channel }
self._trace("execution starts:", repr(source)[:50])
close = self._local_redirect_thread_output(outid, errid)
@@ -168,10 +199,6 @@
else:
channel.close()
- def _local_schedulexec(self, channel, sourcetask):
- self._trace("dispatching exec")
- self._execpool.dispatch(self._thread_executor, channel, sourcetask)
-
def _newredirectchannelid(self, callback):
if callback is None:
return
@@ -245,8 +272,18 @@
return Handle()
def exit(self):
- """ Try to stop all IO activity. """
- self._send(None)
+ """ Try to stop all exec and IO activity. """
+ self._stopexec()
+ self._stopsend()
+
+ def _stopsend(self):
+ self._send(None)
+
+ def _stopexec(self):
+ if hasattr(self, '_requestqueue'):
+ self._requestqueue.put(None)
+ if self._workerpool:
+ self._workerpool.shutdown()
def join(self, joinexec=True):
""" Wait for all IO (and by default all execution activity)
@@ -257,8 +294,9 @@
self._trace("joining receiver thread")
self._receiverthread.join()
if joinexec:
- self._execpool.join()
- self._trace("joining execution threads finished, current %r" % current)
+ if self._workerpool:
+ self._workerpool.join()
+ self._trace("joining execution threads finished, current %r" % current)
def getid(gw, cache={}):
name = gw.__class__.__name__
Modified: py/branch/lessthread/py/execnet/register.py
==============================================================================
--- py/branch/lessthread/py/execnet/register.py (original)
+++ py/branch/lessthread/py/execnet/register.py Tue Mar 6 18:35:02 2007
@@ -41,7 +41,7 @@
bootstrap = [extra]
bootstrap += [getsource(x) for x in startup_modules]
bootstrap += [io.server_stmt,
- "Gateway(io=io, _startcount=2).join(joinexec=False)",
+ "Gateway(io=io, _startcount=2)._servemain()",
]
source = "\n".join(bootstrap)
self._trace("sending gateway bootstrap code")
More information about the pytest-commit
mailing list