[py-svn] r10860 - in py/dist/py/execnet: . testing
hpk at codespeak.net
hpk at codespeak.net
Tue Apr 19 17:14:03 CEST 2005
Author: hpk
Date: Tue Apr 19 17:14:03 2005
New Revision: 10860
Modified:
py/dist/py/execnet/channel.py
py/dist/py/execnet/gateway.py
py/dist/py/execnet/message.py
py/dist/py/execnet/register.py
py/dist/py/execnet/testing/test_gateway.py
Log:
rename internal methods to make code more readable.
Modified: py/dist/py/execnet/channel.py
==============================================================================
--- py/dist/py/execnet/channel.py (original)
+++ py/dist/py/execnet/channel.py Tue Apr 19 17:14:03 2005
@@ -42,7 +42,7 @@
#
# internal methods, called from the receiver thread
#
- def _do_close(self, stickyerror=EOFError()):
+ def _local_close(self, stickyerror=EOFError()):
if self.id in self.gateway.channelfactory:
del self.gateway.channelfactory[self.id]
self._stickyerror = stickyerror
@@ -52,21 +52,21 @@
self._closeevent.set()
- def _do_receivechannel(self, newid):
+ def _local_receivechannel(self, newid):
""" receive a remotely created new (sub)channel. """
newchannel = Channel(self.gateway, newid)
self.gateway.channelfactory[newid] = newchannel
#self._depchannel.append(newchannel)
self._items.put(newchannel)
- def _do_receivedata(self, data):
+ def _local_receivedata(self, data):
if self._callback is not None:
- self.gateway._dispatchcallback(self._callback, data)
+ self.gateway._local_dispatchcallback(self._callback, data)
else:
self._items.put(data)
- def _do_scheduleexec(self, sourcetask):
- self.gateway._scheduleexec(channel=self, sourcetask=sourcetask)
+ def _local_schedulexec(self, sourcetask):
+ self.gateway._local_schedulexec(channel=self, sourcetask=sourcetask)
#
# public API for channel objects
@@ -94,10 +94,7 @@
put(Message.CHANNEL_CLOSE_ERROR(self.id, str(error)))
else:
put(Message.CHANNEL_CLOSE(self.id))
- self._do_close()
-
- def remote_exec(self, source, stdout=None, stderr=None):
- self.gateway._remote_exec(self, source, stdout, stderr)
+ self._local_close()
def newchannel(self):
""" return a new independent channel."""
@@ -164,7 +161,7 @@
class ChannelFactory(object):
def __init__(self, gateway, startcount=1):
self._dict = dict()
- self._lock = threading.RLock()
+ self._lock = threading.Lock()
self.gateway = gateway
self.count = startcount
Modified: py/dist/py/execnet/gateway.py
==============================================================================
--- py/dist/py/execnet/gateway.py (original)
+++ py/dist/py/execnet/gateway.py Tue Apr 19 17:14:03 2005
@@ -45,34 +45,45 @@
return "<%s %s/%s (%d active channels)>" %(self.__class__.__name__,
R, S, i)
- def _stopexec(self):
- #self.pool.prunestopped()
- self._execpool.shutdown()
+ def _local_trystopexec(self):
+ try:
+ self._execpool.shutdown()
+ except IOError:
+ return False
+ return True
def exit(self):
+ """ initiate full gateway teardown.
+ Note that the teardown of sender/receiver threads happens
+ asynchronously and timeouts on stopping worker execution
+ threads are ignored. You can issue join() or join(joinexec=False)
+ if you want to wait for a full teardown (possibly excluding
+ execution threads).
+ """
# note that threads may still be scheduled to start
# during our execution!
self._exitlock.acquire()
try:
if self.running:
self.running = False
- self._stopexec()
- if self.pool.getstarted('sender'):
- self._outgoing.put(Message.EXIT_GATEWAY())
- self.trace("exit procedure triggered, pid %d, gateway %r" % (
- os.getpid(), self))
+ if not self.pool.getstarted('sender'):
+ raise IOError("sender thread not alive anymore!")
+ self._outgoing.put(Message.EXIT_GATEWAY())
+ self.trace("exit procedure triggered, pid %d " % (os.getpid(),))
_gateways.remove(self)
finally:
self._exitlock.release()
- def join(self):
+ def join(self, joinexec=True):
current = threading.currentThread()
for x in self.pool.getstarted():
if x != current:
self.trace("joining %s" % x)
x.join()
- self._execpool.join()
- self.trace("joining threads finished, current %r" % current)
+ self.trace("joining sender/reciver threads finished, current %r" % current)
+ if joinexec:
+ self._execpool.join()
+ self.trace("joining execution threads finished, current %r" % current)
def trace(self, *args):
if debug:
@@ -125,11 +136,11 @@
finally:
self.trace('leaving %r' % threading.currentThread())
- def _redirect_thread_output(self, outid, errid):
+ def _local_redirect_thread_output(self, outid, errid):
l = []
for name, id in ('stdout', outid), ('stderr', errid):
if id:
- channel = self._makechannel(outid)
+ channel = self._local_makechannelobject(outid)
out = ThreadOut(sys, name)
out.setwritefunc(channel.send)
l.append((out, channel))
@@ -139,7 +150,7 @@
channel.close()
return close
- def _makechannel(self, newid):
+ def _local_makechannelobject(self, newid):
newchannel = Channel(self, newid)
self.channelfactory[newid] = newchannel
return newchannel
@@ -149,7 +160,7 @@
try:
loc = { 'channel' : channel }
self.trace("execution starts:", repr(source)[:50])
- close = self._redirect_thread_output(outid, errid)
+ close = self._local_redirect_thread_output(outid, errid)
try:
co = compile(source+'\n', '', 'exec', 4096)
exec co in loc
@@ -167,31 +178,17 @@
else:
channel.close()
- def _scheduleexec(self, channel, sourcetask):
+ def _local_schedulexec(self, channel, sourcetask):
self.trace("dispatching exec")
self._execpool.dispatch(self.thread_executor, channel, sourcetask)
- def _dispatchcallback(self, callback, data):
+ def _local_dispatchcallback(self, callback, data):
# XXX this should run in a separate thread because
# we might otherwise block the receiver thread
# where we get called from
callback(data)
- def _remote_exec(self, channel, source, stdout=None, stderr=None):
- try:
- source = str(Source(source))
- except NameError:
- try:
- import py
- source = str(py.code.Source(source))
- except ImportError:
- pass
- outid = self._redirectchannelid(stdout)
- errid = self._redirectchannelid(stderr)
- self._outgoing.put(Message.CHANNEL_OPEN(channel.id,
- (source, outid, errid)))
-
- def _redirectchannelid(self, callback):
+ def _newredirectchannelid(self, callback):
if callback is None:
return
if hasattr(callback, 'write'):
@@ -210,15 +207,26 @@
""" return new channel object. """
return self.channelfactory.new()
- def remote_exec(self, source, stdout=None, stderr=None):
+ def remote_exec(self, source, stdout=None, stderr=None, channel=None):
""" return channel object for communicating with the asynchronously
executing 'source' code which will have a corresponding 'channel'
object in its executing namespace. If a channel object is not
- provided a new channel will be created. If a channel is provided
- is will be returned as well.
+ provided a new channel will be created.
"""
- channel = self.newchannel()
- channel.remote_exec(source, stdout=stdout, stderr=stderr)
+ try:
+ source = str(Source(source))
+ except NameError:
+ try:
+ import py
+ source = str(py.code.Source(source))
+ except ImportError:
+ pass
+ if channel is None:
+ channel = self.newchannel()
+ outid = self._newredirectchannelid(stdout)
+ errid = self._newredirectchannelid(stderr)
+ self._outgoing.put(Message.CHANNEL_OPEN(channel.id,
+ (source, outid, errid)))
return channel
def remote_redirect(self, stdout=None, stderr=None):
Modified: py/dist/py/execnet/message.py
==============================================================================
--- py/dist/py/execnet/message.py (original)
+++ py/dist/py/execnet/message.py Tue Apr 19 17:14:03 2005
@@ -63,14 +63,14 @@
class EXIT_GATEWAY(Message):
def received(self, gateway):
# executes in receiver thread
- gateway._stopexec()
for x in gateway.channelfactory.values():
- x._do_close()
+ x._local_close()
gateway._outgoing.put(self.STOP_RECEIVING())
+ gateway._local_trystopexec()
raise SystemExit
def post_sent(self, gateway, excinfo=None):
- gateway._stopexec()
# executes in sender thread
+ gateway._local_trystopexec()
gateway.io.close_write()
raise SystemExit
@@ -81,7 +81,7 @@
# already. With sockets closing it would raise
# a Transport Not Connected exception
for x in gateway.channelfactory.values():
- x._do_close()
+ x._local_close()
raise SystemExit
def post_sent(self, gateway, excinfo=None):
# after we sent STOP_RECEIVING we don't
@@ -92,28 +92,28 @@
class CHANNEL_OPEN(Message):
def received(self, gateway):
channel = gateway.channelfactory.new(self.channelid)
- channel._do_scheduleexec(self.data)
+ channel._local_schedulexec(self.data)
class CHANNEL_NEW(Message):
def received(self, gateway):
newid = self.data
channel = gateway.channelfactory[self.channelid]
- channel._do_receivechannel(newid)
+ channel._local_receivechannel(newid)
class CHANNEL_DATA(Message):
def received(self, gateway):
channel = gateway.channelfactory[self.channelid]
- channel._do_receivedata(self.data)
+ channel._local_receivedata(self.data)
class CHANNEL_CLOSE(Message):
def received(self, gateway):
channel = gateway.channelfactory[self.channelid]
- channel._do_close()
+ channel._local_close()
class CHANNEL_CLOSE_ERROR(Message):
def received(self, gateway):
channel = gateway.channelfactory[self.channelid]
- channel._do_close(channel.RemoteError(self.data))
+ channel._local_close(channel.RemoteError(self.data))
classes = [x for x in locals().values() if hasattr(x, '__bases__')]
classes.sort(lambda x,y : cmp(x.__name__, y.__name__))
Modified: py/dist/py/execnet/register.py
==============================================================================
--- py/dist/py/execnet/register.py (original)
+++ py/dist/py/execnet/register.py Tue Apr 19 17:14:03 2005
@@ -42,7 +42,7 @@
"""
bootstrap = ["we_are_remote=True", extra]
bootstrap += [getsource(x) for x in startup_modules]
- bootstrap += [io.server_stmt, "Gateway(io=io, startcount=2).join()",]
+ bootstrap += [io.server_stmt, "Gateway(io=io, startcount=2).join(joinexec=False)",]
source = "\n".join(bootstrap)
self.trace("sending gateway bootstrap code")
io.write('%r\n' % source)
Modified: py/dist/py/execnet/testing/test_gateway.py
==============================================================================
--- py/dist/py/execnet/testing/test_gateway.py (original)
+++ py/dist/py/execnet/testing/test_gateway.py Tue Apr 19 17:14:03 2005
@@ -125,14 +125,14 @@
assert x == 42
py.test.raises(channel.RemoteError, channel.receive)
- def test_channel__do_close(self):
+ def test_channel__local_close(self):
channel = self.gw.channelfactory.new()
- channel._do_close()
+ channel._local_close()
channel.waitclose(0.1)
- def test_channel__do_close_error(self):
+ def test_channel__local_close_error(self):
channel = self.gw.channelfactory.new()
- channel._do_close(channel.RemoteError("error"))
+ channel._local_close(channel.RemoteError("error"))
py.test.raises(channel.RemoteError, channel.waitclose, 0.01)
def test_channel_iter(self):
@@ -166,7 +166,7 @@
channel = self.gw.newchannel()
l = []
channel.setcallback(l.append)
- channel.remote_exec('''
+ self.gw.remote_exec(channel=channel, source='''
channel.send(42)
channel.send(13)
''')
@@ -243,6 +243,22 @@
assert first.strip() == 'hello world'
py.test.raises(EOFError, channel.receive)
+#class TestBlockingIssues:
+# def test_join_blocked_execution_gateway(self):
+# gateway = py.execnet.PopenGateway()
+# channel = gateway.remote_exec("""
+# time.sleep(5.0)
+# """)
+# def doit():
+# gateway.exit()
+# gateway.join(joinexec=True)
+# return 17
+#
+# pool = py._thread.WorkerPool()
+# reply = pool.dispatch(doit)
+# x = reply.get(timeout=1.0)
+# assert x == 17
+
class TestBasicPopenGateway(PopenGatewayTestSetup, BasicRemoteExecution):
#disabled = True
def test_many_popen(self):
More information about the pytest-commit
mailing list