[py-svn] r37822 - in py/trunk/py/execnet: . testing
hpk at codespeak.net
hpk at codespeak.net
Fri Feb 2 20:57:49 CET 2007
Author: hpk
Date: Fri Feb 2 20:57:47 2007
New Revision: 37822
Modified:
py/trunk/py/execnet/gateway.py
py/trunk/py/execnet/register.py
py/trunk/py/execnet/testing/test_gateway.py
Log:
added lots of docstrings, general cleanup
Modified: py/trunk/py/execnet/gateway.py
==============================================================================
--- py/trunk/py/execnet/gateway.py (original)
+++ py/trunk/py/execnet/gateway.py Fri Feb 2 20:57:47 2007
@@ -33,23 +33,25 @@
class Gateway(object):
_ThreadOut = ThreadOut
remoteaddress = ""
-
- def __init__(self, io, startcount=2, maxthreads=None):
+ def __init__(self, io, execthreads=None, _startcount=2):
+ """ initialize core gateway, using the given
+ inputoutput object and 'execthreads' execution
+ threads.
+ """
global registered_cleanup
- self._execpool = WorkerPool(maxthreads=maxthreads)
-## self.running = True
+ self._execpool = WorkerPool(maxthreads=execthreads)
self._io = io
self._outgoing = Queue.Queue()
- self._channelfactory = ChannelFactory(self, startcount)
-## self._exitlock = threading.Lock()
+ self._channelfactory = ChannelFactory(self, _startcount)
if not registered_cleanup:
atexit.register(cleanup_atexit)
registered_cleanup = True
_active_sendqueues[self._outgoing] = True
self._pool = NamedThreadPool(receiver = self._thread_receiver,
- sender = self._thread_sender)
+ sender = self._thread_sender)
def __repr__(self):
+ """ return string representing gateway type and status. """
addr = self.remoteaddress
if addr:
addr = '[%s]' % (addr,)
@@ -199,13 +201,12 @@
return self._channelfactory.new()
def remote_exec(self, source, stdout=None, stderr=None):
- """ return channel object for communicating with the asynchronously
- executing 'source' code which will have a corresponding 'channel'
- object in its executing namespace.
-
- You may provide callback functions 'stdout' and 'stderr'
- which will get called with the remote stdout/stderr output
- piece by piece.
+ """ return channel object and connect it to a remote
+ execution thread where the given 'source' executes
+ and has the sister 'channel' object in its global
+ namespace. The callback functions 'stdout' and
+ 'stderr' get called on receival of remote
+ stdout/stderr output strings.
"""
try:
source = str(Source(source))
@@ -252,29 +253,8 @@
c.waitclose(1.0)
return Handle()
-## def exit(self):
-## """ initiate full gateway teardown.
-## Note that the teardown of sender/receiver threads happens
-## asynchronously and timeouts on stopping worker execution
-## threads are ignored. You can issue join() or join(joinexec=False)
-## if you want to wait for a full teardown (possibly excluding
-## execution threads).
-## """
-## # note that threads may still be scheduled to start
-## # during our execution!
-## self._exitlock.acquire()
-## try:
-## if self.running:
-## self.running = False
-## if not self._pool.getstarted('sender'):
-## raise IOError("sender thread not alive anymore!")
-## self._outgoing.put(None)
-## self._trace("exit procedure triggered, pid %d " % (os.getpid(),))
-## _gateways.remove(self)
-## finally:
-## self._exitlock.release()
-
def exit(self):
+ """ Try to stop all IO activity. """
try:
del _active_sendqueues[self._outgoing]
except KeyError:
@@ -283,6 +263,9 @@
self._outgoing.put(None)
def join(self, joinexec=True):
+ """ Wait for all IO (and by default all execution activity)
+ to stop.
+ """
current = threading.currentThread()
for x in self._pool.getstarted():
if x != current:
Modified: py/trunk/py/execnet/register.py
==============================================================================
--- py/trunk/py/execnet/register.py (original)
+++ py/trunk/py/execnet/register.py Fri Feb 2 20:57:47 2007
@@ -7,9 +7,7 @@
# the list of modules that must be send to the other side
# for bootstrapping gateways
-# XXX we want to have a cleaner bootstrap mechanism
-# by making sure early that we have the py lib available
-# in a sufficient version
+# XXX we'd like to have a leaner and meaner bootstrap mechanism
startup_modules = [
'py.__.thread.io',
@@ -30,7 +28,7 @@
""" initialize gateways on both sides of a inputoutput object. """
def __init__(self, io):
self._remote_bootstrap_gateway(io)
- super(InstallableGateway, self).__init__(io=io, startcount=1)
+ super(InstallableGateway, self).__init__(io=io, _startcount=1)
def _remote_bootstrap_gateway(self, io, extra=''):
""" return Gateway with a asynchronously remotely
@@ -42,7 +40,9 @@
"""
bootstrap = [extra]
bootstrap += [getsource(x) for x in startup_modules]
- bootstrap += [io.server_stmt, "Gateway(io=io, startcount=2).join(joinexec=False)",]
+ bootstrap += [io.server_stmt,
+ "Gateway(io=io, _startcount=2).join(joinexec=False)",
+ ]
source = "\n".join(bootstrap)
self._trace("sending gateway bootstrap code")
io.write('%r\n' % source)
@@ -52,45 +52,20 @@
infile, outfile = os.popen2(cmd)
io = inputoutput.Popen2IO(infile, outfile)
super(PopenCmdGateway, self).__init__(io=io)
-## self._pidchannel = self.remote_exec("""
-## import os
-## channel.send(os.getpid())
-## """)
-
-## def exit(self):
-## try:
-## self._pidchannel.waitclose(timeout=0.5)
-## pid = self._pidchannel.receive()
-## except IOError:
-## self._trace("IOError: could not receive child PID:")
-## self._traceex(sys.exc_info())
-## pid = None
-## super(PopenCmdGateway, self).exit()
-## if pid is not None:
-## self._trace("waiting for pid %s" % pid)
-## try:
-## os.waitpid(pid, 0)
-## except KeyboardInterrupt:
-## if sys.platform != "win32":
-## os.kill(pid, 15)
-## raise
-## except OSError, e:
-## self._trace("child process %s already dead? error:%s" %
-## (pid, str(e)))
class PopenGateway(PopenCmdGateway):
- # use sysfind/sysexec/subprocess instead of os.popen?
+ """ This Gateway provides interaction with a newly started
+ python subprocess.
+ """
def __init__(self, python=sys.executable):
+ """ instantiate a gateway to a subprocess
+ started with the given 'python' executable.
+ """
cmd = '%s -u -c "exec input()"' % python
super(PopenGateway, self).__init__(cmd)
def _remote_bootstrap_gateway(self, io, extra=''):
- # XXX the following hack helps us to import the same version
- # of the py lib and other dependcies, but only works for
- # PopenGateways because we can assume to have access to
- # the same filesystem
- # --> we definitely need proper remote imports working
- # across any kind of gateway!
+ # have the subprocess use the same PYTHONPATH and py lib
x = py.path.local(py.__file__).dirpath().dirpath()
ppath = os.environ.get('PYTHONPATH', '')
plist = [str(x)] + ppath.split(':')
@@ -98,48 +73,66 @@
"import sys ; sys.path[:0] = %r" % (plist,),
"import os ; os.environ['PYTHONPATH'] = %r" % ppath,
str(py.code.Source(stdouterrin_setnull)),
- "stdouterrin_setnull()",
+ "stdouterrin_setnull()",
""
])
super(PopenGateway, self)._remote_bootstrap_gateway(io, s)
class SocketGateway(InstallableGateway):
+ """ This Gateway provides interaction with a remote process
+ by connecting to a specified socket. On the remote
+ side you need to manually start a small script
+ (py/execnet/script/socketserver.py) that accepts
+ SocketGateway connections.
+ """
def __init__(self, host, port):
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ """ instantiate a gateway to a processed accessed
+ via a host/port specified socket.
+ """
self.host = host = str(host)
self.port = port = int(port)
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
io = inputoutput.SocketIO(sock)
super(SocketGateway, self).__init__(io=io)
self.remoteaddress = '%s:%d' % (self.host, self.port)
- def remote_install(cls, gateway, hostport=None):
- """ return a connected socket gateway through the
- given gateway.
+ def new_remote(cls, gateway, hostport=None):
+ """ return a new (connected) socket gateway, instatiated
+ indirectly through the given 'gateway'.
"""
if hostport is None:
host, port = ('', 0) # XXX works on all platforms?
else:
host, port = hostport
socketserverbootstrap = py.code.Source(
- mypath.dirpath('script', 'socketserver.py').read('rU'),
- """
+ mypath.dirpath('script', 'socketserver.py').read('rU'), """
import socket
sock = bind_and_listen((%r, %r))
port = sock.getsockname()
channel.send(port)
startserver(sock)
- """ % (host, port))
+ """ % (host, port)
+ )
# execute the above socketserverbootstrap on the other side
channel = gateway.remote_exec(socketserverbootstrap)
(realhost, realport) = channel.receive()
- #gateway._trace("remote_install received"
- # "port=%r, hostname = %r" %(realport, hostname))
+ #gateway._trace("new_remote received"
+ # "port=%r, hostname = %r" %(realport, hostname))
return py.execnet.SocketGateway(host, realport)
- remote_install = classmethod(remote_install)
+ new_remote = classmethod(new_remote)
+
class SshGateway(PopenCmdGateway):
+ """ This Gateway provides interaction with a remote process,
+ established via the 'ssh' command line binary.
+ The remote side needs to have a Python interpreter executable.
+ """
def __init__(self, sshaddress, remotepython='python', identity=None):
+ """ instantiate a remote ssh process with the
+ given 'sshaddress' and remotepython version.
+ you may specify an 'identity' filepath.
+ """
self.remoteaddress = sshaddress
remotecmd = '%s -u -c "exec input()"' % (remotepython,)
cmdline = [sshaddress, remotecmd]
@@ -160,8 +153,11 @@
])
super(SshGateway, self)._remote_bootstrap_gateway(io, extra)
+
def stdouterrin_setnull():
- # redirect file descriptors 0 and 1 to /dev/null, to avoid
+ """ redirect file descriptors 0 and 1 (and possibly 2) to /dev/null.
+ note that this function may run remotely without py lib support.
+ """
# complete confusion (this is independent from the sys.stdout
# and sys.stderr redirection that gateway.remote_exec() can do)
# note that we redirect fd 2 on win too, since for some reason that
@@ -188,55 +184,3 @@
os.dup2(fd, 2)
os.close(fd)
-# XXX
-# XXX unusued code below
-# XXX
-
-class ExecGateway(PopenGateway):
- def remote_exec_sync_stdcapture(self, lines, callback):
- # hack: turn the content of the cell into
- #
- # if 1:
- # line1
- # line2
- # ...
- #
- lines = [' ' + line for line in lines]
- lines.insert(0, 'if 1:')
- lines.append('')
- sourcecode = '\n'.join(lines)
- try:
- callbacks = self.callbacks
- except AttributeError:
- callbacks = self.callbacks = {}
- answerid = id(callback)
- self.callbacks[answerid] = callback
-
- self.exec_remote('''
- import sys, StringIO
- try:
- execns
- except:
- execns = {}
- oldout, olderr = sys.stdout, sys.stderr
- try:
- buffer = StringIO.StringIO()
- sys.stdout = sys.stderr = buffer
- try:
- exec compile(%(sourcecode)r, 'single') in execns
- except:
- import traceback
- traceback.print_exc()
- finally:
- sys.stdout=oldout
- sys.stderr=olderr
- # fiddle us (the caller) into executing the callback on remote answers
- gateway.exec_remote(
- "gateway.invoke_callback(%(answerid)r, %%r)" %% buffer.getvalue())
- ''' % locals())
-
- def invoke_callback(self, answerid, value):
- callback = self.callbacks[answerid]
- del self.callbacks[answerid]
- callback(value)
-
Modified: py/trunk/py/execnet/testing/test_gateway.py
==============================================================================
--- py/trunk/py/execnet/testing/test_gateway.py (original)
+++ py/trunk/py/execnet/testing/test_gateway.py Fri Feb 2 20:57:47 2007
@@ -302,10 +302,11 @@
def test_remote_exec_redirect_multi(self):
num = 3
l = [[] for x in range(num)]
- channels = [self.gw.remote_exec("print %d" % i, stdout=l[i].append)
+ channels = [self.gw.remote_exec("print %d" % i,
+ stdout=l[i].append)
for i in range(num)]
for x in channels:
- x.waitclose(1.0)
+ x.waitclose(5.0)
for i in range(num):
subl = l[i]
@@ -451,9 +452,9 @@
def setup_class(cls):
# open a gateway to a fresh child process
cls.proxygw = py.execnet.PopenGateway()
- cls.gw = py.execnet.SocketGateway.remote_install(cls.proxygw,
- ("127.0.0.1", 0)
- )
+ cls.gw = py.execnet.SocketGateway.new_remote(cls.proxygw,
+ ("127.0.0.1", 0)
+ )
## def teardown_class(cls):
## cls.gw.exit()
More information about the pytest-commit
mailing list