[py-svn] py-trunk commit b571b7e9a9b2: remove py.execnet, substitute py.execnet usages with "execnet" ones.
commits-noreply at bitbucket.org
commits-noreply at bitbucket.org
Fri Oct 2 17:01:19 CEST 2009
# HG changeset patch -- Bitbucket.org
# Project py-trunk
# URL http://bitbucket.org/hpk42/py-trunk/overview/
# User holger krekel <holger at merlinux.eu>
# Date 1254495537 -7200
# Node ID b571b7e9a9b2f2f44d63a58360476de7868fd312
# Parent 2036f394193fe3d3b37cf5b80ea2128f5ae9764e
remove py.execnet, substitute py.execnet usages with "execnet" ones.
--- a/py/execnet/gateway.py
+++ /dev/null
@@ -1,354 +0,0 @@
-"""
-gateway code for initiating popen, socket and ssh connections.
-(c) 2004-2009, Holger Krekel and others
-"""
-
-import sys, os, inspect, socket, atexit, weakref
-import py
-from py.__.execnet.gateway_base import Message, Popen2IO, SocketIO
-from py.__.execnet import gateway_base
-
-debug = False
-
-class GatewayCleanup:
- def __init__(self):
- self._activegateways = weakref.WeakKeyDictionary()
- atexit.register(self.cleanup_atexit)
-
- def register(self, gateway):
- assert gateway not in self._activegateways
- self._activegateways[gateway] = True
-
- def unregister(self, gateway):
- del self._activegateways[gateway]
-
- def cleanup_atexit(self):
- if debug:
- debug.writeslines(["="*20, "cleaning up", "=" * 20])
- debug.flush()
- for gw in list(self._activegateways):
- gw.exit()
- #gw.join() # should work as well
-
-class ExecnetAPI:
- def pyexecnet_gateway_init(self, gateway):
- """ signal initialisation of new gateway. """
- def pyexecnet_gateway_exit(self, gateway):
- """ signal exitting of gateway. """
-
-class InitiatingGateway(gateway_base.BaseGateway):
- """ initialize gateways on both sides of a inputoutput object. """
- # XXX put the next two global variables into an Execnet object
- # which intiaties gateways and passes in appropriate values.
- _cleanup = GatewayCleanup()
- hook = ExecnetAPI()
-
- def __init__(self, io):
- self._remote_bootstrap_gateway(io)
- super(InitiatingGateway, self).__init__(io=io, _startcount=1)
- self._initreceive()
- self.hook = py._com.HookRelay(ExecnetAPI, py._com.comregistry)
- self.hook.pyexecnet_gateway_init(gateway=self)
- self._cleanup.register(self)
-
- def __repr__(self):
- """ return string representing gateway type and status. """
- if hasattr(self, 'remoteaddress'):
- addr = '[%s]' % (self.remoteaddress,)
- else:
- addr = ''
- try:
- r = (self._receiverthread.isAlive() and "receiving" or
- "not receiving")
- s = "sending" # XXX
- i = len(self._channelfactory.channels())
- except AttributeError:
- r = s = "uninitialized"
- i = "no"
- return "<%s%s %s/%s (%s active channels)>" %(
- self.__class__.__name__, addr, r, s, i)
-
- def exit(self):
- """ Try to stop all exec and IO activity. """
- try:
- self._cleanup.unregister(self)
- except KeyError:
- return # we assume it's already happened
- self._stopexec()
- self._stopsend()
- self.hook.pyexecnet_gateway_exit(gateway=self)
-
- def _remote_bootstrap_gateway(self, io, extra=''):
- """ return Gateway with a asynchronously remotely
- initialized counterpart Gateway (which may or may not succeed).
- Note that the other sides gateways starts enumerating
- its channels with even numbers while the sender
- gateway starts with odd numbers. This allows to
- uniquely identify channels across both sides.
- """
- bootstrap = [extra]
- bootstrap += [inspect.getsource(gateway_base)]
- bootstrap += [io.server_stmt,
- "io.write('1'.encode('ascii'))",
- "SlaveGateway(io=io, _startcount=2).serve()",
- ]
- source = "\n".join(bootstrap)
- self._trace("sending gateway bootstrap code")
- #open("/tmp/bootstrap.py", 'w').write(source)
- repr_source = repr(source) + "\n"
- io.write(repr_source.encode('ascii'))
- s = io.read(1)
- assert s == "1".encode('ascii')
-
- def _rinfo(self, update=False):
- """ return some sys/env information from remote. """
- if update or not hasattr(self, '_cache_rinfo'):
- ch = self.remote_exec(rinfo_source)
- self._cache_rinfo = RInfo(**ch.receive())
- return self._cache_rinfo
-
- def remote_exec(self, source):
- """ return channel object and connect it to a remote
- execution thread where the given 'source' executes
- and has the sister 'channel' object in its global
- namespace.
- """
- source = str(py.code.Source(source))
- channel = self.newchannel()
- self._send(Message.CHANNEL_OPEN(channel.id, source))
- return channel
-
- def remote_init_threads(self, num=None):
- """ start up to 'num' threads for subsequent
- remote_exec() invocations to allow concurrent
- execution.
- """
- if hasattr(self, '_remotechannelthread'):
- raise IOError("remote threads already running")
- from py.__.thread import pool
- source = py.code.Source(pool, """
- execpool = WorkerPool(maxthreads=%r)
- gw = channel.gateway
- while 1:
- task = gw._execqueue.get()
- if task is None:
- gw._stopsend()
- execpool.shutdown()
- execpool.join()
- raise gw._StopExecLoop
- execpool.dispatch(gw.executetask, task)
- """ % num)
- self._remotechannelthread = self.remote_exec(source)
-
- def _remote_redirect(self, stdout=None, stderr=None):
- """ return a handle representing a redirection of a remote
- end's stdout to a local file object. with handle.close()
- the redirection will be reverted.
- """
- # XXX implement a remote_exec_in_globals(...)
- # to send ThreadOut implementation over
- clist = []
- for name, out in ('stdout', stdout), ('stderr', stderr):
- if out:
- outchannel = self.newchannel()
- outchannel.setcallback(getattr(out, 'write', out))
- channel = self.remote_exec("""
- import sys
- outchannel = channel.receive()
- ThreadOut(sys, %r).setdefaultwriter(outchannel.send)
- """ % name)
- channel.send(outchannel)
- clist.append(channel)
- for c in clist:
- c.waitclose()
- class Handle:
- def close(_):
- for name, out in ('stdout', stdout), ('stderr', stderr):
- if out:
- c = self.remote_exec("""
- import sys
- channel.gateway._ThreadOut(sys, %r).resetdefault()
- """ % name)
- c.waitclose()
- return Handle()
-
-
-
-class RInfo:
- def __init__(self, **kwargs):
- self.__dict__.update(kwargs)
- def __repr__(self):
- info = ", ".join(["%s=%s" % item
- for item in self.__dict__.items()])
- return "<RInfo %r>" % info
-
-rinfo_source = """
-import sys, os
-channel.send(dict(
- executable = sys.executable,
- version_info = tuple([sys.version_info[i] for i in range(5)]),
- platform = sys.platform,
- cwd = os.getcwd(),
- pid = os.getpid(),
-))
-"""
-
-class PopenCmdGateway(InitiatingGateway):
- def __init__(self, args):
- from subprocess import Popen, PIPE
- self._popen = p = Popen(args, stdin=PIPE, stdout=PIPE)
- io = Popen2IO(p.stdin, p.stdout)
- super(PopenCmdGateway, self).__init__(io=io)
-
- def exit(self):
- super(PopenCmdGateway, self).exit()
- self._popen.poll()
-
-popen_bootstrapline = "import sys ; exec(eval(sys.stdin.readline()))"
-class PopenGateway(PopenCmdGateway):
- """ This Gateway provides interaction with a newly started
- python subprocess.
- """
- def __init__(self, python=None):
- """ instantiate a gateway to a subprocess
- started with the given 'python' executable.
- """
- if not python:
- python = sys.executable
- args = [str(python), '-c', popen_bootstrapline]
- super(PopenGateway, self).__init__(args)
-
- def _remote_bootstrap_gateway(self, io, extra=''):
- # have the subprocess use the same PYTHONPATH and py lib
- x = py.path.local(py.__file__).dirpath().dirpath()
- ppath = os.environ.get('PYTHONPATH', '')
- plist = [str(x)] + ppath.split(':')
- s = "\n".join([extra,
- "import sys ; sys.path[:0] = %r" % (plist,),
- "import os ; os.environ['PYTHONPATH'] = %r" % ppath,
- inspect.getsource(stdouterrin_setnull),
- "stdouterrin_setnull()",
- ""
- ])
- super(PopenGateway, self)._remote_bootstrap_gateway(io, s)
-
-class SocketGateway(InitiatingGateway):
- """ This Gateway provides interaction with a remote process
- by connecting to a specified socket. On the remote
- side you need to manually start a small script
- (py/execnet/script/socketserver.py) that accepts
- SocketGateway connections.
- """
- def __init__(self, host, port):
- """ instantiate a gateway to a process accessed
- via a host/port specified socket.
- """
- self.host = host = str(host)
- self.port = port = int(port)
- self.remoteaddress = '%s:%d' % (self.host, self.port)
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- sock.connect((host, port))
- except socket.gaierror:
- raise HostNotFound(str(sys.exc_info()[1]))
- io = SocketIO(sock)
- super(SocketGateway, self).__init__(io=io)
-
- def new_remote(cls, gateway, hostport=None):
- """ return a new (connected) socket gateway, instatiated
- indirectly through the given 'gateway'.
- """
- if hostport is None:
- host, port = ('', 0) # XXX works on all platforms?
- else:
- host, port = hostport
- mydir = py.path.local(__file__).dirpath()
- socketserverbootstrap = py.code.Source(
- mydir.join('script', 'socketserver.py').read('r'), """
- import socket
- sock = bind_and_listen((%r, %r))
- port = sock.getsockname()
- channel.send(port)
- startserver(sock)
- """ % (host, port)
- )
- # execute the above socketserverbootstrap on the other side
- channel = gateway.remote_exec(socketserverbootstrap)
- (realhost, realport) = channel.receive()
- #gateway._trace("new_remote received"
- # "port=%r, hostname = %r" %(realport, hostname))
- return py.execnet.SocketGateway(host, realport)
- new_remote = classmethod(new_remote)
-
-class HostNotFound(Exception):
- pass
-
-class SshGateway(PopenCmdGateway):
- """ This Gateway provides interaction with a remote Python process,
- established via the 'ssh' command line binary.
- The remote side needs to have a Python interpreter executable.
- """
-
- def __init__(self, sshaddress, remotepython=None, ssh_config=None):
- """ instantiate a remote ssh process with the
- given 'sshaddress' and remotepython version.
- you may specify an ssh_config file.
- """
- self.remoteaddress = sshaddress
- if remotepython is None:
- remotepython = "python"
- args = ['ssh', '-C' ]
- if ssh_config is not None:
- args.extend(['-F', str(ssh_config)])
- remotecmd = '%s -c "%s"' %(remotepython, popen_bootstrapline)
- args.extend([sshaddress, remotecmd])
- super(SshGateway, self).__init__(args)
-
- def _remote_bootstrap_gateway(self, io, s=""):
- extra = "\n".join([
- str(py.code.Source(stdouterrin_setnull)),
- "stdouterrin_setnull()",
- s,
- ])
- try:
- super(SshGateway, self)._remote_bootstrap_gateway(io, extra)
- except EOFError:
- ret = self._popen.wait()
- if ret == 255:
- raise HostNotFound(self.remoteaddress)
-
-def stdouterrin_setnull():
- """ redirect file descriptors 0 and 1 (and possibly 2) to /dev/null.
- note that this function may run remotely without py lib support.
- """
- # complete confusion (this is independent from the sys.stdout
- # and sys.stderr redirection that gateway.remote_exec() can do)
- # note that we redirect fd 2 on win too, since for some reason that
- # blocks there, while it works (sending to stderr if possible else
- # ignoring) on *nix
- import sys, os
- if not hasattr(os, 'dup'): # jython
- return
- try:
- devnull = os.devnull
- except AttributeError:
- if os.name == 'nt':
- devnull = 'NUL'
- else:
- devnull = '/dev/null'
- # stdin
- sys.stdin = os.fdopen(os.dup(0), 'r', 1)
- fd = os.open(devnull, os.O_RDONLY)
- os.dup2(fd, 0)
- os.close(fd)
-
- # stdout
- sys.stdout = os.fdopen(os.dup(1), 'w', 1)
- fd = os.open(devnull, os.O_WRONLY)
- os.dup2(fd, 1)
-
- # stderr for win32
- if os.name == 'nt':
- sys.stderr = os.fdopen(os.dup(2), 'w', 1)
- os.dup2(fd, 2)
- os.close(fd)
--- a/py/execnet/script/loop_socketserver.py
+++ /dev/null
@@ -1,14 +0,0 @@
-
-import os, sys
-import subprocess
-
-if __name__ == '__main__':
- directory = os.path.dirname(os.path.abspath(sys.argv[0]))
- script = os.path.join(directory, 'socketserver.py')
- while 1:
- cmdlist = ["python", script]
- cmdlist.extend(sys.argv[1:])
- text = "starting subcommand: " + " ".join(cmdlist)
- print(text)
- process = subprocess.Popen(cmdlist)
- process.wait()
--- a/doc/test/funcargs.txt
+++ b/doc/test/funcargs.txt
@@ -165,7 +165,7 @@ and to offer a new mysetup method:
host = self.config.option.ssh
if host is None:
py.test.skip("specify ssh host with --ssh")
- return py.execnet.SshGateway(host)
+ return execnet.SshGateway(host)
Now any test function can use the ``mysetup.getsshconnection()`` method like this:
--- a/bin-for-dist/gensetup.py
+++ b/bin-for-dist/gensetup.py
@@ -3,7 +3,7 @@ import sys
sys.path.insert(0, sys.argv[1])
import py
-toolpath = py.magic.autopath()
+toolpath = py.path.local(__file__)
binpath = py.path.local(py.__file__).dirpath('bin')
def error(msg):
--- a/testing/execnet/test_multi.py
+++ /dev/null
@@ -1,58 +0,0 @@
-"""
- tests for
- - multi channels and multi gateways
-
-"""
-
-import py
-
-class TestMultiChannelAndGateway:
- def test_multichannel_receive_each(self):
- class pseudochannel:
- def receive(self):
- return 12
-
- pc1 = pseudochannel()
- pc2 = pseudochannel()
- multichannel = py.execnet.MultiChannel([pc1, pc2])
- l = multichannel.receive_each(withchannel=True)
- assert len(l) == 2
- assert l == [(pc1, 12), (pc2, 12)]
- l = multichannel.receive_each(withchannel=False)
- assert l == [12,12]
-
- def test_multichannel_send_each(self):
- l = [py.execnet.PopenGateway() for x in range(2)]
- gm = py.execnet.MultiGateway(l)
- mc = gm.remote_exec("""
- import os
- channel.send(channel.receive() + 1)
- """)
- mc.send_each(41)
- l = mc.receive_each()
- assert l == [42,42]
-
- def test_multichannel_receive_queue_for_two_subprocesses(self):
- l = [py.execnet.PopenGateway() for x in range(2)]
- gm = py.execnet.MultiGateway(l)
- mc = gm.remote_exec("""
- import os
- channel.send(os.getpid())
- """)
- queue = mc.make_receive_queue()
- ch, item = queue.get(timeout=10)
- ch2, item2 = queue.get(timeout=10)
- assert ch != ch2
- assert ch.gateway != ch2.gateway
- assert item != item2
- mc.waitclose()
-
- def test_multichannel_waitclose(self):
- l = []
- class pseudochannel:
- def waitclose(self):
- l.append(0)
- multichannel = py.execnet.MultiChannel([pseudochannel(), pseudochannel()])
- multichannel.waitclose()
- assert len(l) == 2
-
--- a/testing/execnet/test_xspec.py
+++ /dev/null
@@ -1,151 +0,0 @@
-import py
-
-XSpec = py.execnet.XSpec
-
-class TestXSpec:
- def test_norm_attributes(self):
- spec = XSpec("socket=192.168.102.2:8888//python=c:/this/python2.5//chdir=d:\hello")
- assert spec.socket == "192.168.102.2:8888"
- assert spec.python == "c:/this/python2.5"
- assert spec.chdir == "d:\hello"
- assert spec.nice is None
- assert not hasattr(spec, '_xyz')
-
- py.test.raises(AttributeError, "spec._hello")
-
- spec = XSpec("socket=192.168.102.2:8888//python=python2.5//nice=3")
- assert spec.socket == "192.168.102.2:8888"
- assert spec.python == "python2.5"
- assert spec.chdir is None
- assert spec.nice == "3"
-
- spec = XSpec("ssh=user at host//chdir=/hello/this//python=/usr/bin/python2.5")
- assert spec.ssh == "user at host"
- assert spec.python == "/usr/bin/python2.5"
- assert spec.chdir == "/hello/this"
-
- spec = XSpec("popen")
- assert spec.popen == True
-
- def test__samefilesystem(self):
- assert XSpec("popen")._samefilesystem()
- assert XSpec("popen//python=123")._samefilesystem()
- assert not XSpec("popen//chdir=hello")._samefilesystem()
-
- def test__spec_spec(self):
- for x in ("popen", "popen//python=this"):
- assert XSpec(x)._spec == x
-
- def test_samekeyword_twice_raises(self):
- py.test.raises(ValueError, "XSpec('popen//popen')")
- py.test.raises(ValueError, "XSpec('popen//popen=123')")
-
- def test_unknown_keys_allowed(self):
- xspec = XSpec("hello=3")
- assert xspec.hello == '3'
-
- def test_repr_and_string(self):
- for x in ("popen", "popen//python=this"):
- assert repr(XSpec(x)).find("popen") != -1
- assert str(XSpec(x)) == x
-
- def test_hash_equality(self):
- assert XSpec("popen") == XSpec("popen")
- assert hash(XSpec("popen")) == hash(XSpec("popen"))
- assert XSpec("popen//python=123") != XSpec("popen")
- assert hash(XSpec("socket=hello:8080")) != hash(XSpec("popen"))
-
-class TestMakegateway:
- def test_no_type(self):
- py.test.raises(ValueError, "py.execnet.makegateway('hello')")
-
- def test_popen(self):
- gw = py.execnet.makegateway("popen")
- assert gw.spec.python == None
- rinfo = gw._rinfo()
- assert rinfo.executable == py.std.sys.executable
- assert rinfo.cwd == py.std.os.getcwd()
- assert rinfo.version_info == py.std.sys.version_info
-
- def test_popen_nice(self):
- gw = py.execnet.makegateway("popen//nice=5")
- remotenice = gw.remote_exec("""
- import os
- if hasattr(os, 'nice'):
- channel.send(os.nice(0))
- else:
- channel.send(None)
- """).receive()
- if remotenice is not None:
- assert remotenice == 5
-
- def test_popen_explicit(self):
- gw = py.execnet.makegateway("popen//python=%s" % py.std.sys.executable)
- assert gw.spec.python == py.std.sys.executable
- rinfo = gw._rinfo()
- assert rinfo.executable == py.std.sys.executable
- assert rinfo.cwd == py.std.os.getcwd()
- assert rinfo.version_info == py.std.sys.version_info
-
- def test_popen_cpython25(self):
- for trypath in ('python2.5', r'C:\Python25\python.exe'):
- cpython25 = py.path.local.sysfind(trypath)
- if cpython25 is not None:
- cpython25 = cpython25.realpath()
- break
- else:
- py.test.skip("cpython2.5 not found")
- gw = py.execnet.makegateway("popen//python=%s" % cpython25)
- rinfo = gw._rinfo()
- if py.std.sys.platform != "darwin": # it's confusing there
- assert rinfo.executable == cpython25
- assert rinfo.cwd == py.std.os.getcwd()
- assert rinfo.version_info[:2] == (2,5)
-
- def test_popen_cpython26(self):
- for trypath in ('python2.6', r'C:\Python26\python.exe'):
- cpython26 = py.path.local.sysfind(trypath)
- if cpython26 is not None:
- break
- else:
- py.test.skip("cpython2.6 not found")
- gw = py.execnet.makegateway("popen//python=%s" % cpython26)
- rinfo = gw._rinfo()
- assert rinfo.executable == cpython26
- assert rinfo.cwd == py.std.os.getcwd()
- assert rinfo.version_info[:2] == (2,6)
-
- def test_popen_chdir_absolute(self, testdir):
- gw = py.execnet.makegateway("popen//chdir=%s" % testdir.tmpdir)
- rinfo = gw._rinfo()
- assert rinfo.cwd == str(testdir.tmpdir.realpath())
-
- def test_popen_chdir_newsub(self, testdir):
- testdir.chdir()
- gw = py.execnet.makegateway("popen//chdir=hello")
- rinfo = gw._rinfo()
- assert rinfo.cwd == str(testdir.tmpdir.join("hello").realpath())
-
- def test_ssh(self, specssh):
- sshhost = specssh.ssh
- gw = py.execnet.makegateway("ssh=%s" % sshhost)
- rinfo = gw._rinfo()
- gw2 = py.execnet.SshGateway(sshhost)
- rinfo2 = gw2._rinfo()
- assert rinfo.executable == rinfo2.executable
- assert rinfo.cwd == rinfo2.cwd
- assert rinfo.version_info == rinfo2.version_info
-
- def test_socket(self, specsocket):
- gw = py.execnet.makegateway("socket=%s" % specsocket.socket)
- rinfo = gw._rinfo()
- assert rinfo.executable
- assert rinfo.cwd
- assert rinfo.version_info
- # we cannot instantiate a second gateway
-
- #gw2 = py.execnet.SocketGateway(*specsocket.socket.split(":"))
- #rinfo2 = gw2._rinfo()
- #assert rinfo.executable == rinfo2.executable
- #assert rinfo.cwd == rinfo2.cwd
- #assert rinfo.version_info == rinfo2.version_info
--- a/example/execnet/svn-sync-repo.py
+++ b/example/execnet/svn-sync-repo.py
@@ -82,7 +82,7 @@ def get_svn_youngest(repo):
return int(rev)
def getgateway(host, keyfile=None):
- return py.execnet.SshGateway(host, identity=keyfile)
+ return execnet.SshGateway(host, identity=keyfile)
if __name__ == '__main__':
if len(sys.argv) < 3:
--- a/py/test/config.py
+++ b/py/test/config.py
@@ -252,7 +252,8 @@ class Config(object):
xspeclist.extend([xspec[i+1:]] * num)
if not xspeclist:
raise self.Error("MISSING test execution (tx) nodes: please specify --tx")
- return [py.execnet.XSpec(x) for x in xspeclist]
+ import execnet
+ return [execnet.XSpec(x) for x in xspeclist]
def getrsyncdirs(self):
config = self
--- a/testing/execnet/test_serializer.py
+++ /dev/null
@@ -1,179 +0,0 @@
-# -*- coding: utf-8 -*-
-import sys
-import os
-import tempfile
-import subprocess
-import py
-from py.__.execnet import serializer
-
-
-def _find_version(suffix=""):
- name = "python" + suffix
- executable = py.path.local.sysfind(name)
- if executable is None:
- py.test.skip("can't find a %r executable" % (name,))
- return executable
-
-def setup_module(mod):
- mod.TEMPDIR = py.path.local(tempfile.mkdtemp())
- if sys.version_info > (3, 0):
- mod._py3_wrapper = PythonWrapper(py.path.local(sys.executable))
- mod._py2_wrapper = PythonWrapper(_find_version())
- else:
- mod._py3_wrapper = PythonWrapper(_find_version("3"))
- mod._py2_wrapper = PythonWrapper(py.path.local(sys.executable))
- mod._old_pypath = os.environ.get("PYTHONPATH")
- pylib = str(py.path.local(py.__file__).dirpath().join(".."))
- os.environ["PYTHONPATH"] = pylib
-
-def teardown_module(mod):
- TEMPDIR.remove(True)
- if _old_pypath is not None:
- os.environ["PYTHONPATH"] = _old_pypath
-
-
-class PythonWrapper(object):
-
- def __init__(self, executable):
- self.executable = executable
-
- def dump(self, obj_rep):
- script_file = TEMPDIR.join("dump.py")
- script_file.write("""
-from py.__.execnet import serializer
-import sys
-if sys.version_info > (3, 0): # Need binary output
- sys.stdout = sys.stdout.detach()
-saver = serializer.Serializer(sys.stdout)
-saver.save(%s)""" % (obj_rep,))
- return self.executable.sysexec(script_file)
-
- def load(self, data, option_args=""):
- script_file = TEMPDIR.join("load.py")
- script_file.write(r"""
-from py.__.execnet import serializer
-import sys
-if sys.version_info > (3, 0):
- sys.stdin = sys.stdin.detach()
-options = serializer.UnserializationOptions(%s)
-loader = serializer.Unserializer(sys.stdin, options)
-obj = loader.load()
-sys.stdout.write(type(obj).__name__ + "\n")
-sys.stdout.write(repr(obj))""" % (option_args,))
- popen = subprocess.Popen([str(self.executable), str(script_file)],
- stdin=subprocess.PIPE,
- stderr=subprocess.PIPE,
- stdout=subprocess.PIPE)
- stdout, stderr = popen.communicate(data.encode("latin-1"))
- ret = popen.returncode
- if ret:
- raise py.process.cmdexec.Error(ret, ret, str(self.executable),
- stdout, stderr)
- return [s.decode("ascii") for s in stdout.splitlines()]
-
- def __repr__(self):
- return "<PythonWrapper for %s>" % (self.executable,)
-
-
-def pytest_funcarg__py2(request):
- return _py2_wrapper
-
-def pytest_funcarg__py3(request):
- return _py3_wrapper
-
-def pytest_funcarg__dump(request):
- py_dump = request.getfuncargvalue(request.param[0])
- return py_dump.dump
-
-def pytest_funcarg__load(request):
- py_dump = request.getfuncargvalue(request.param[1])
- return py_dump.load
-
-def pytest_generate_tests(metafunc):
- if 'dump' in metafunc.funcargnames and 'load' in metafunc.funcargnames:
- pys = 'py2', 'py3'
- for dump in pys:
- for load in pys:
- param = (dump, load)
- conversion = '%s to %s'%param
- if 'repr' not in metafunc.funcargnames:
- metafunc.addcall(id=conversion, param=param)
- else:
- for tp, repr in simple_tests.items():
- metafunc.addcall(
- id='%s:%s'%(tp, conversion),
- param=param,
- funcargs={'tp_name':tp, 'repr':repr},
- )
-
-
-simple_tests = {
-# type: expected before/after repr
- 'int': '4',
- 'float':'3.25',
- 'list': '[1, 2, 3]',
- 'tuple': '(1, 2, 3)',
- 'dict': '{6: 2, (1, 2, 3): 32}',
-}
-
-def test_simple(tp_name, repr, dump, load):
- p = dump(repr)
- tp , v = load(p)
- assert tp == tp_name
- assert v == repr
-
-
- at py.test.mark.xfail
-# I'm not sure if we need the complexity.
-def test_recursive_list(py2, py3):
- l = [1, 2, 3]
- l.append(l)
- p = py2.dump(l)
- tp, rep = py2.load(l)
- assert tp == "list"
-
-def test_bigint_should_fail():
- py.test.raises(serializer.SerializationError,
- serializer.Serializer(py.io.BytesIO()).save,
- 123456678900)
-
-def test_bytes(py2, py3):
- p = py3.dump("b'hi'")
- tp, v = py2.load(p)
- assert tp == "str"
- assert v == "'hi'"
- tp, v = py3.load(p)
- assert tp == "bytes"
- assert v == "b'hi'"
-
-def test_string(py2, py3):
- p = py2.dump("'xyz'")
- tp, s = py2.load(p)
- assert tp == "str"
- assert s == "'xyz'"
- tp, s = py3.load(p)
- assert tp == "bytes"
- assert s == "b'xyz'"
- tp, s = py3.load(p, "True")
- assert tp == "str"
- assert s == "'xyz'"
- p = py3.dump("'xyz'")
- tp, s = py2.load(p, True)
- assert tp == "str"
- assert s == "'xyz'"
-
-def test_unicode(py2, py3):
- p = py2.dump("u'hi'")
- tp, s = py2.load(p)
- assert tp == "unicode"
- assert s == "u'hi'"
- tp, s = py3.load(p)
- assert tp == "str"
- assert s == "'hi'"
- p = py3.dump("'hi'")
- tp, s = py3.load(p)
- assert tp == "str"
- assert s == "'hi'"
- tp, s = py2.load(p)
- assert tp == "unicode"
- assert s == "u'hi'"
--- a/py/execnet/rsync.py
+++ /dev/null
@@ -1,201 +0,0 @@
-"""
-1:N rsync implemenation on top of execnet.
-
-(c) 2006-2009, Armin Rigo, Holger Krekel, Maciej Fijalkowski
-"""
-import py, os, stat
-
-md5 = py.builtin._tryimport('hashlib', 'md5').md5
-Queue = py.builtin._tryimport('queue', 'Queue').Queue
-
-class RSync(object):
- """ This class allows to send a directory structure (recursively)
- to one or multiple remote filesystems.
-
- There is limited support for symlinks, which means that symlinks
- pointing to the sourcetree will be send "as is" while external
- symlinks will be just copied (regardless of existance of such
- a path on remote side).
- """
- def __init__(self, sourcedir, callback=None, verbose=True):
- self._sourcedir = str(sourcedir)
- self._verbose = verbose
- assert callback is None or py.builtin.callable(callback)
- self._callback = callback
- self._channels = {}
- self._receivequeue = Queue()
- self._links = []
-
- def filter(self, path):
- return True
-
- def _end_of_channel(self, channel):
- if channel in self._channels:
- # too early! we must have got an error
- channel.waitclose()
- # or else we raise one
- raise IOError('connection unexpectedly closed: %s ' % (
- channel.gateway,))
-
- def _process_link(self, channel):
- for link in self._links:
- channel.send(link)
- # completion marker, this host is done
- channel.send(42)
-
- def _done(self, channel):
- """ Call all callbacks
- """
- finishedcallback = self._channels.pop(channel)
- if finishedcallback:
- finishedcallback()
-
- def _list_done(self, channel):
- # sum up all to send
- if self._callback:
- s = sum([self._paths[i] for i in self._to_send[channel]])
- self._callback("list", s, channel)
-
- def _send_item(self, channel, data):
- """ Send one item
- """
- modified_rel_path, checksum = data
- modifiedpath = os.path.join(self._sourcedir, *modified_rel_path)
- try:
- f = open(modifiedpath, 'rb')
- data = f.read()
- except IOError:
- data = None
-
- # provide info to progress callback function
- modified_rel_path = "/".join(modified_rel_path)
- if data is not None:
- self._paths[modified_rel_path] = len(data)
- else:
- self._paths[modified_rel_path] = 0
- if channel not in self._to_send:
- self._to_send[channel] = []
- self._to_send[channel].append(modified_rel_path)
- #print "sending", modified_rel_path, data and len(data) or 0, checksum
-
- if data is not None:
- f.close()
- if checksum is not None and checksum == md5(data).digest():
- data = None # not really modified
- else:
- self._report_send_file(channel.gateway, modified_rel_path)
- channel.send(data)
-
- def _report_send_file(self, gateway, modified_rel_path):
- if self._verbose:
- print("%s <= %s" %(gateway, modified_rel_path))
-
- def send(self, raises=True):
- """ Sends a sourcedir to all added targets. Flag indicates
- whether to raise an error or return in case of lack of
- targets
- """
- if not self._channels:
- if raises:
- raise IOError("no targets available, maybe you "
- "are trying call send() twice?")
- return
- # normalize a trailing '/' away
- self._sourcedir = os.path.dirname(os.path.join(self._sourcedir, 'x'))
- # send directory structure and file timestamps/sizes
- self._send_directory_structure(self._sourcedir)
-
- # paths and to_send are only used for doing
- # progress-related callbacks
- self._paths = {}
- self._to_send = {}
-
- # send modified file to clients
- while self._channels:
- channel, req = self._receivequeue.get()
- if req is None:
- self._end_of_channel(channel)
- else:
- command, data = req
- if command == "links":
- self._process_link(channel)
- elif command == "done":
- self._done(channel)
- elif command == "ack":
- if self._callback:
- self._callback("ack", self._paths[data], channel)
- elif command == "list_done":
- self._list_done(channel)
- elif command == "send":
- self._send_item(channel, data)
- del data
- else:
- assert "Unknown command %s" % command
-
- def add_target(self, gateway, destdir,
- finishedcallback=None, **options):
- """ Adds a remote target specified via a 'gateway'
- and a remote destination directory.
- """
- assert finishedcallback is None or py.builtin.callable(finishedcallback)
- for name in options:
- assert name in ('delete',)
- def itemcallback(req):
- self._receivequeue.put((channel, req))
- channel = gateway.remote_exec(REMOTE_SOURCE)
- channel.setcallback(itemcallback, endmarker = None)
- channel.send((str(destdir), options))
- self._channels[channel] = finishedcallback
-
- def _broadcast(self, msg):
- for channel in self._channels:
- channel.send(msg)
-
- def _send_link(self, basename, linkpoint):
- self._links.append(("link", basename, linkpoint))
-
- def _send_directory(self, path):
- # dir: send a list of entries
- names = []
- subpaths = []
- for name in os.listdir(path):
- p = os.path.join(path, name)
- if self.filter(p):
- names.append(name)
- subpaths.append(p)
- self._broadcast(names)
- for p in subpaths:
- self._send_directory_structure(p)
-
- def _send_link_structure(self, path):
- linkpoint = os.readlink(path)
- basename = path[len(self._sourcedir) + 1:]
- if not linkpoint.startswith(os.sep):
- # relative link, just send it
- # XXX: do sth with ../ links
- self._send_link(basename, linkpoint)
- elif linkpoint.startswith(self._sourcedir):
- self._send_link(basename, linkpoint[len(self._sourcedir) + 1:])
- else:
- self._send_link(basename, linkpoint)
- self._broadcast(None)
-
- def _send_directory_structure(self, path):
- try:
- st = os.lstat(path)
- except OSError:
- self._broadcast((0, 0))
- return
- if stat.S_ISREG(st.st_mode):
- # regular file: send a timestamp/size pair
- self._broadcast((st.st_mtime, st.st_size))
- elif stat.S_ISDIR(st.st_mode):
- self._send_directory(path)
- elif stat.S_ISLNK(st.st_mode):
- self._send_link_structure(path)
- else:
- raise ValueError("cannot sync %r" % (path,))
-
-REMOTE_SOURCE = py.path.local(__file__).dirpath().\
- join('rsync_remote.py').open().read() + "\nf()"
-
--- a/py/test/dist/mypickle.py
+++ b/py/test/dist/mypickle.py
@@ -13,7 +13,7 @@
"""
import py
-from py.__.execnet.gateway_base import Channel
+from execnet.gateway_base import Channel
import sys, os, struct
#debug = open("log-mypickle-%d" % os.getpid(), 'w')
--- a/py/execnet/xspec.py
+++ /dev/null
@@ -1,79 +0,0 @@
-"""
-(c) 2008-2009, holger krekel
-"""
-import py
-
-class XSpec:
- """ Execution Specification: key1=value1//key2=value2 ...
- * keys need to be unique within the specification scope
- * neither key nor value are allowed to contain "//"
- * keys are not allowed to contain "="
- * keys are not allowed to start with underscore
- * if no "=value" is given, assume a boolean True value
- """
- # XXX allow customization, for only allow specific key names
- popen = ssh = socket = python = chdir = nice = None
-
- def __init__(self, string):
- self._spec = string
- for keyvalue in string.split("//"):
- i = keyvalue.find("=")
- if i == -1:
- key, value = keyvalue, True
- else:
- key, value = keyvalue[:i], keyvalue[i+1:]
- if key[0] == "_":
- raise AttributeError("%r not a valid XSpec key" % key)
- if key in self.__dict__:
- raise ValueError("duplicate key: %r in %r" %(key, string))
- setattr(self, key, value)
-
- def __getattr__(self, name):
- if name[0] == "_":
- raise AttributeError(name)
- return None
-
- def __repr__(self):
- return "<XSpec %r>" %(self._spec,)
- def __str__(self):
- return self._spec
-
- def __hash__(self):
- return hash(self._spec)
- def __eq__(self, other):
- return self._spec == getattr(other, '_spec', None)
- def __ne__(self, other):
- return self._spec != getattr(other, '_spec', None)
-
- def _samefilesystem(self):
- return bool(self.popen and not self.chdir)
-
-def makegateway(spec):
- if not isinstance(spec, XSpec):
- spec = XSpec(spec)
- if spec.popen:
- gw = py.execnet.PopenGateway(python=spec.python)
- elif spec.ssh:
- gw = py.execnet.SshGateway(spec.ssh, remotepython=spec.python)
- elif spec.socket:
- assert not spec.python, "socket: specifying python executables not supported"
- hostport = spec.socket.split(":")
- gw = py.execnet.SocketGateway(*hostport)
- else:
- raise ValueError("no gateway type found for %r" % (spec._spec,))
- gw.spec = spec
- if spec.chdir or spec.nice:
- channel = gw.remote_exec("""
- import os
- path, nice = channel.receive()
- if path:
- if not os.path.exists(path):
- os.mkdir(path)
- os.chdir(path)
- if nice and hasattr(os, 'nice'):
- os.nice(nice)
- """)
- nice = spec.nice and int(spec.nice) or 0
- channel.send((spec.chdir, nice))
- channel.waitclose()
- return gw
--- a/py/execnet/multi.py
+++ /dev/null
@@ -1,71 +0,0 @@
-"""
-Support for working with multiple channels and gateways
-
-(c) 2008-2009, Holger Krekel and others
-"""
-import py
-try:
- import queue
-except ImportError:
- import Queue as queue
-
-NO_ENDMARKER_WANTED = object()
-
-class MultiGateway:
- def __init__(self, gateways):
- self.gateways = gateways
- def remote_exec(self, source):
- channels = []
- for gw in self.gateways:
- channels.append(gw.remote_exec(source))
- return MultiChannel(channels)
- def exit(self):
- for gw in self.gateways:
- gw.exit()
-
-class MultiChannel:
- def __init__(self, channels):
- self._channels = channels
-
- def send_each(self, item):
- for ch in self._channels:
- ch.send(item)
-
- def receive_each(self, withchannel=False):
- assert not hasattr(self, '_queue')
- l = []
- for ch in self._channels:
- obj = ch.receive()
- if withchannel:
- l.append((ch, obj))
- else:
- l.append(obj)
- return l
-
- def make_receive_queue(self, endmarker=NO_ENDMARKER_WANTED):
- try:
- return self._queue
- except AttributeError:
- self._queue = queue.Queue()
- for ch in self._channels:
- def putreceived(obj, channel=ch):
- self._queue.put((channel, obj))
- if endmarker is NO_ENDMARKER_WANTED:
- ch.setcallback(putreceived)
- else:
- ch.setcallback(putreceived, endmarker=endmarker)
- return self._queue
-
-
- def waitclose(self):
- first = None
- for ch in self._channels:
- try:
- ch.waitclose()
- except ch.RemoteError:
- if first is None:
- first = py.std.sys.exc_info()
- if first:
- py.builtin._reraise(first[0], first[1], first[2])
-
-
--- a/testing/pytest/dist/test_gwmanage.py
+++ b/testing/pytest/dist/test_gwmanage.py
@@ -9,6 +9,7 @@ import py
import os
from py.__.test.dist.gwmanage import GatewayManager, HostRSync
from py.__.test.plugin import hookspec
+import execnet
def pytest_funcarg__hookrecorder(request):
_pytest = request.getfuncargvalue('_pytest')
@@ -35,7 +36,7 @@ class TestGatewayManagerPopen:
hm = GatewayManager(["popen"] * 2, hook)
hm.makegateways()
call = hookrecorder.popcall("pytest_gwmanage_newgateway")
- assert call.gateway.spec == py.execnet.XSpec("popen")
+ assert call.gateway.spec == execnet.XSpec("popen")
assert call.gateway.id == "[1]"
assert call.platinfo.executable == call.gateway._rinfo().executable
call = hookrecorder.popcall("pytest_gwmanage_newgateway")
@@ -149,7 +150,7 @@ class TestHRSync:
def test_hrsync_one_host(self, mysetup):
source, dest = mysetup.source, mysetup.dest
- gw = py.execnet.makegateway("popen//chdir=%s" % dest)
+ gw = execnet.makegateway("popen//chdir=%s" % dest)
finished = []
rsync = HostRSync(source)
rsync.add_target_host(gw, finished=lambda: finished.append(1))
--- a/py/path/gateway/channeltest2.py
+++ b/py/path/gateway/channeltest2.py
@@ -11,8 +11,8 @@ channel.send(srv.p2c(py.path.local("/tmp
'''
-#gw = py.execnet.SshGateway('codespeak.net')
-gw = py.execnet.PopenGateway()
+#gw = execnet.SshGateway('codespeak.net')
+gw = execnet.PopenGateway()
gw.remote_init_threads(5)
c = gw.remote_exec(SRC, stdout=py.std.sys.stdout, stderr=py.std.sys.stderr)
subchannel = gw._channelfactory.new()
--- a/contrib/svn-sync-repo.py
+++ b/contrib/svn-sync-repo.py
@@ -3,7 +3,7 @@
"""
small utility for hot-syncing a svn repository through ssh.
-uses py.execnet.
+uses execnet.
"""
@@ -105,7 +105,7 @@ def get_svn_youngest(repo):
return int(rev)
def getgateway(host, keyfile=None):
- return py.execnet.SshGateway(host, identity=keyfile)
+ return execnet.SshGateway(host, identity=keyfile)
if __name__ == '__main__':
if len(sys.argv) < 3:
--- a/testing/pytest/plugin/test_pytest_terminal.py
+++ b/testing/pytest/plugin/test_pytest_terminal.py
@@ -105,6 +105,7 @@ class TestTerminal:
])
def test_gwmanage_events(self, testdir, linecomp):
+ execnet = py.test.importorskip("execnet")
modcol = testdir.getmodulecol("""
def test_one():
pass
@@ -113,10 +114,10 @@ class TestTerminal:
rep = TerminalReporter(modcol.config, file=linecomp.stringio)
class gw1:
id = "X1"
- spec = py.execnet.XSpec("popen")
+ spec = execnet.XSpec("popen")
class gw2:
id = "X2"
- spec = py.execnet.XSpec("popen")
+ spec = execnet.XSpec("popen")
class rinfo:
version_info = (2, 5, 1, 'final', 0)
executable = "hello"
--- a/testing/execnet/test_gateway.py
+++ /dev/null
@@ -1,545 +0,0 @@
-"""
-mostly functional tests of gateways.
-"""
-import os, sys, time
-import py
-from py.__.execnet import gateway_base, gateway
-queue = py.builtin._tryimport('queue', 'Queue')
-
-TESTTIMEOUT = 10.0 # seconds
-
-class TestBasicRemoteExecution:
- def test_correct_setup(self, gw):
- assert gw._receiverthread.isAlive()
-
- def test_repr_doesnt_crash(self, gw):
- assert isinstance(repr(gw), str)
-
- def test_attribute__name__(self, gw):
- channel = gw.remote_exec("channel.send(__name__)")
- name = channel.receive()
- assert name == "__channelexec__"
-
- def test_correct_setup_no_py(self, gw):
- channel = gw.remote_exec("""
- import sys
- channel.send(list(sys.modules))
- """)
- remotemodules = channel.receive()
- assert 'py' not in remotemodules, (
- "py should not be imported on remote side")
-
- def test_remote_exec_waitclose(self, gw):
- channel = gw.remote_exec('pass')
- channel.waitclose(TESTTIMEOUT)
-
- def test_remote_exec_waitclose_2(self, gw):
- channel = gw.remote_exec('def gccycle(): pass')
- channel.waitclose(TESTTIMEOUT)
-
- def test_remote_exec_waitclose_noarg(self, gw):
- channel = gw.remote_exec('pass')
- channel.waitclose()
-
- def test_remote_exec_error_after_close(self, gw):
- channel = gw.remote_exec('pass')
- channel.waitclose(TESTTIMEOUT)
- py.test.raises(IOError, channel.send, 0)
-
- def test_remote_exec_channel_anonymous(self, gw):
- channel = gw.remote_exec('''
- obj = channel.receive()
- channel.send(obj)
- ''')
- channel.send(42)
- result = channel.receive()
- assert result == 42
-
-class TestChannelBasicBehaviour:
- def test_channel_close_and_then_receive_error(self, gw):
- channel = gw.remote_exec('raise ValueError')
- py.test.raises(channel.RemoteError, channel.receive)
-
- def test_channel_finish_and_then_EOFError(self, gw):
- channel = gw.remote_exec('channel.send(42)')
- x = channel.receive()
- assert x == 42
- py.test.raises(EOFError, channel.receive)
- py.test.raises(EOFError, channel.receive)
- py.test.raises(EOFError, channel.receive)
-
- def test_channel_close_and_then_receive_error_multiple(self, gw):
- channel = gw.remote_exec('channel.send(42) ; raise ValueError')
- x = channel.receive()
- assert x == 42
- py.test.raises(channel.RemoteError, channel.receive)
-
- def test_channel__local_close(self, gw):
- channel = gw._channelfactory.new()
- gw._channelfactory._local_close(channel.id)
- channel.waitclose(0.1)
-
- def test_channel__local_close_error(self, gw):
- channel = gw._channelfactory.new()
- gw._channelfactory._local_close(channel.id,
- channel.RemoteError("error"))
- py.test.raises(channel.RemoteError, channel.waitclose, 0.01)
-
- def test_channel_error_reporting(self, gw):
- channel = gw.remote_exec('def foo():\n return foobar()\nfoo()\n')
- try:
- channel.receive()
- except channel.RemoteError:
- e = sys.exc_info()[1]
- assert str(e).startswith('Traceback (most recent call last):')
- assert str(e).find('NameError: global name \'foobar\' '
- 'is not defined') > -1
- else:
- py.test.fail('No exception raised')
-
- def test_channel_syntax_error(self, gw):
- # missing colon
- channel = gw.remote_exec('def foo()\n return 1\nfoo()\n')
- try:
- channel.receive()
- except channel.RemoteError:
- e = sys.exc_info()[1]
- assert str(e).startswith('Traceback (most recent call last):')
- assert str(e).find('SyntaxError') > -1
-
- def test_channel_iter(self, gw):
- channel = gw.remote_exec("""
- for x in range(3):
- channel.send(x)
- """)
- l = list(channel)
- assert l == [0, 1, 2]
-
- def test_channel_passing_over_channel(self, gw):
- channel = gw.remote_exec('''
- c = channel.gateway.newchannel()
- channel.send(c)
- c.send(42)
- ''')
- c = channel.receive()
- x = c.receive()
- assert x == 42
-
- # check that the both sides previous channels are really gone
- channel.waitclose(TESTTIMEOUT)
- #assert c.id not in gw._channelfactory
- newchan = gw.remote_exec('''
- assert %d not in channel.gateway._channelfactory._channels
- ''' % (channel.id))
- newchan.waitclose(TESTTIMEOUT)
- assert channel.id not in gw._channelfactory._channels
-
- def test_channel_receiver_callback(self, gw):
- l = []
- #channel = gw.newchannel(receiver=l.append)
- channel = gw.remote_exec(source='''
- channel.send(42)
- channel.send(13)
- channel.send(channel.gateway.newchannel())
- ''')
- channel.setcallback(callback=l.append)
- py.test.raises(IOError, channel.receive)
- channel.waitclose(TESTTIMEOUT)
- assert len(l) == 3
- assert l[:2] == [42,13]
- assert isinstance(l[2], channel.__class__)
-
- def test_channel_callback_after_receive(self, gw):
- l = []
- channel = gw.remote_exec(source='''
- channel.send(42)
- channel.send(13)
- channel.send(channel.gateway.newchannel())
- ''')
- x = channel.receive()
- assert x == 42
- channel.setcallback(callback=l.append)
- py.test.raises(IOError, channel.receive)
- channel.waitclose(TESTTIMEOUT)
- assert len(l) == 2
- assert l[0] == 13
- assert isinstance(l[1], channel.__class__)
-
- def test_waiting_for_callbacks(self, gw):
- l = []
- def callback(msg):
- import time; time.sleep(0.2)
- l.append(msg)
- channel = gw.remote_exec(source='''
- channel.send(42)
- ''')
- channel.setcallback(callback)
- channel.waitclose(TESTTIMEOUT)
- assert l == [42]
-
- def test_channel_callback_stays_active(self, gw):
- self.check_channel_callback_stays_active(gw, earlyfree=True)
-
- def check_channel_callback_stays_active(self, gw, earlyfree=True):
- # with 'earlyfree==True', this tests the "sendonly" channel state.
- l = []
- channel = gw.remote_exec(source='''
- try:
- import thread
- except ImportError:
- import _thread as thread
- import time
- def producer(subchannel):
- for i in range(5):
- time.sleep(0.15)
- subchannel.send(i*100)
- channel2 = channel.receive()
- thread.start_new_thread(producer, (channel2,))
- del channel2
- ''')
- subchannel = gw.newchannel()
- subchannel.setcallback(l.append)
- channel.send(subchannel)
- if earlyfree:
- subchannel = None
- counter = 100
- while len(l) < 5:
- if subchannel and subchannel.isclosed():
- break
- counter -= 1
- print(counter)
- if not counter:
- py.test.fail("timed out waiting for the answer[%d]" % len(l))
- time.sleep(0.04) # busy-wait
- assert l == [0, 100, 200, 300, 400]
- return subchannel
-
- def test_channel_callback_remote_freed(self, gw):
- channel = self.check_channel_callback_stays_active(gw, earlyfree=False)
- # freed automatically at the end of producer()
- channel.waitclose(TESTTIMEOUT)
-
- def test_channel_endmarker_callback(self, gw):
- l = []
- channel = gw.remote_exec(source='''
- channel.send(42)
- channel.send(13)
- channel.send(channel.gateway.newchannel())
- ''')
- channel.setcallback(l.append, 999)
- py.test.raises(IOError, channel.receive)
- channel.waitclose(TESTTIMEOUT)
- assert len(l) == 4
- assert l[:2] == [42,13]
- assert isinstance(l[2], channel.__class__)
- assert l[3] == 999
-
- def test_channel_endmarker_callback_error(self, gw):
- q = queue.Queue()
- channel = gw.remote_exec(source='''
- raise ValueError()
- ''')
- channel.setcallback(q.put, endmarker=999)
- val = q.get(TESTTIMEOUT)
- assert val == 999
- err = channel._getremoteerror()
- assert err
- assert str(err).find("ValueError") != -1
-
- @py.test.mark.xfail
- def test_remote_redirect_stdout(self, gw):
- out = py.io.TextIO()
- handle = gw._remote_redirect(stdout=out)
- c = gw.remote_exec("print 42")
- c.waitclose(TESTTIMEOUT)
- handle.close()
- s = out.getvalue()
- assert s.strip() == "42"
-
- @py.test.mark.xfail
- def test_remote_exec_redirect_multi(self, gw):
- num = 3
- l = [[] for x in range(num)]
- channels = [gw.remote_exec("print %d" % i,
- stdout=l[i].append)
- for i in range(num)]
- for x in channels:
- x.waitclose(TESTTIMEOUT)
-
- for i in range(num):
- subl = l[i]
- assert subl
- s = subl[0]
- assert s.strip() == str(i)
-
-class TestChannelFile:
- def test_channel_file_write(self, gw):
- channel = gw.remote_exec("""
- f = channel.makefile()
- f.write("hello world\\n")
- f.close()
- channel.send(42)
- """)
- first = channel.receive()
- assert first.strip() == 'hello world'
- second = channel.receive()
- assert second == 42
-
- def test_channel_file_write_error(self, gw):
- channel = gw.remote_exec("pass")
- f = channel.makefile()
- channel.waitclose(TESTTIMEOUT)
- py.test.raises(IOError, f.write, 'hello')
-
- def test_channel_file_proxyclose(self, gw):
- channel = gw.remote_exec("""
- f = channel.makefile(proxyclose=True)
- f.write("hello world")
- f.close()
- channel.send(42)
- """)
- first = channel.receive()
- assert first.strip() == 'hello world'
- py.test.raises(EOFError, channel.receive)
-
- def test_channel_file_read(self, gw):
- channel = gw.remote_exec("""
- f = channel.makefile(mode='r')
- s = f.read(2)
- channel.send(s)
- s = f.read(5)
- channel.send(s)
- """)
- channel.send("xyabcde")
- s1 = channel.receive()
- s2 = channel.receive()
- assert s1 == "xy"
- assert s2 == "abcde"
-
- def test_channel_file_read_empty(self, gw):
- channel = gw.remote_exec("pass")
- f = channel.makefile(mode="r")
- s = f.read(3)
- assert s == ""
- s = f.read(5)
- assert s == ""
-
- def test_channel_file_readline_remote(self, gw):
- channel = gw.remote_exec("""
- channel.send('123\\n45')
- """)
- channel.waitclose(TESTTIMEOUT)
- f = channel.makefile(mode="r")
- s = f.readline()
- assert s == "123\n"
- s = f.readline()
- assert s == "45"
-
- def test_channel_makefile_incompatmode(self, gw):
- channel = gw.newchannel()
- py.test.raises(ValueError, 'channel.makefile("rw")')
-
- def test_confusion_from_os_write_stdout(self, gw):
- channel = gw.remote_exec("""
- import os
- os.write(1, 'confusion!'.encode('ascii'))
- channel.send(channel.receive() * 6)
- channel.send(channel.receive() * 6)
- """)
- channel.send(3)
- res = channel.receive()
- assert res == 18
- channel.send(7)
- res = channel.receive()
- assert res == 42
-
- def test_confusion_from_os_write_stderr(self, gw):
- channel = gw.remote_exec("""
- import os
- os.write(2, 'test'.encode('ascii'))
- channel.send(channel.receive() * 6)
- channel.send(channel.receive() * 6)
- """)
- channel.send(3)
- res = channel.receive()
- assert res == 18
- channel.send(7)
- res = channel.receive()
- assert res == 42
-
- def test__rinfo(self, gw):
- rinfo = gw._rinfo()
- assert rinfo.executable
- assert rinfo.cwd
- assert rinfo.version_info
- s = repr(rinfo)
- old = gw.remote_exec("""
- import os.path
- cwd = os.getcwd()
- channel.send(os.path.basename(cwd))
- os.chdir('..')
- """).receive()
- try:
- rinfo2 = gw._rinfo()
- assert rinfo2.cwd == rinfo.cwd
- rinfo3 = gw._rinfo(update=True)
- assert rinfo3.cwd != rinfo2.cwd
- finally:
- gw._cache_rinfo = rinfo
- gw.remote_exec("import os ; os.chdir(%r)" % old).waitclose()
-
-def test_join_blocked_execution_gateway():
- gateway = py.execnet.PopenGateway()
- channel = gateway.remote_exec("""
- import time
- time.sleep(5.0)
- """)
- def doit():
- gateway.exit()
- gateway.join(joinexec=True)
- return 17
-
- pool = py._thread.WorkerPool()
- reply = pool.dispatch(doit)
- x = reply.get(timeout=1.0)
- assert x == 17
-
-class TestPopenGateway:
- gwtype = 'popen'
-
- def test_chdir_separation(self, tmpdir):
- old = tmpdir.chdir()
- try:
- gw = py.execnet.PopenGateway()
- finally:
- waschangedir = old.chdir()
- c = gw.remote_exec("import os ; channel.send(os.getcwd())")
- x = c.receive()
- assert x == str(waschangedir)
-
- def test_many_popen(self):
- num = 4
- l = []
- for i in range(num):
- l.append(py.execnet.PopenGateway())
- channels = []
- for gw in l:
- channel = gw.remote_exec("""channel.send(42)""")
- channels.append(channel)
-## try:
-## while channels:
-## channel = channels.pop()
-## try:
-## ret = channel.receive()
-## assert ret == 42
-## finally:
-## channel.gateway.exit()
-## finally:
-## for x in channels:
-## x.gateway.exit()
- while channels:
- channel = channels.pop()
- ret = channel.receive()
- assert ret == 42
-
- def test_rinfo_popen(self, gw):
- rinfo = gw._rinfo()
- assert rinfo.executable == py.std.sys.executable
- assert rinfo.cwd == py.std.os.getcwd()
- assert rinfo.version_info == py.std.sys.version_info
-
- def test_gateway_init_event(self, _pytest):
- rec = _pytest.gethookrecorder(gateway.ExecnetAPI)
- gw = py.execnet.PopenGateway()
- call = rec.popcall("pyexecnet_gateway_init")
- assert call.gateway == gw
- gw.exit()
- call = rec.popcall("pyexecnet_gateway_exit")
- assert call.gateway == gw
-
- @py.test.mark.xfail # "fix needed: dying remote process does not cause waitclose() to fail"
- def test_waitclose_on_remote_killed(self):
- gw = py.execnet.PopenGateway()
- channel = gw.remote_exec("""
- import os
- import time
- channel.send(os.getpid())
- while 1:
- channel.send("#" * 100)
- """)
- remotepid = channel.receive()
- py.process.kill(remotepid)
- py.test.raises(channel.RemoteError, "channel.waitclose(TESTTIMEOUT)")
- py.test.raises(EOFError, channel.send, None)
- py.test.raises(EOFError, channel.receive)
-
- at py.test.mark.xfail
-def test_endmarker_delivery_on_remote_killterm():
- if not hasattr(py.std.os, 'kill'):
- py.test.skip("no os.kill()")
- gw = py.execnet.PopenGateway()
- try:
- q = queue.Queue()
- channel = gw.remote_exec(source='''
- import os
- os.kill(os.getpid(), 15)
- ''')
- channel.setcallback(q.put, endmarker=999)
- val = q.get(TESTTIMEOUT)
- assert val == 999
- err = channel._getremoteerror()
- finally:
- gw.exit()
- assert "killed" in str(err)
- assert "15" in str(err)
-
-
-def test_socket_gw_host_not_found(gw):
- py.test.raises(py.execnet.HostNotFound,
- 'py.execnet.SocketGateway("qowieuqowe", 9000)'
- )
-
-class TestSshPopenGateway:
- gwtype = "ssh"
-
- def test_sshconfig_config_parsing(self, monkeypatch):
- import subprocess
- l = []
- monkeypatch.setattr(subprocess, 'Popen',
- lambda *args, **kwargs: l.append(args[0]))
- py.test.raises(AttributeError,
- """py.execnet.SshGateway("xyz", ssh_config='qwe')""")
- assert len(l) == 1
- popen_args = l[0]
- i = popen_args.index('-F')
- assert popen_args[i+1] == "qwe"
-
- def test_sshaddress(self, gw, specssh):
- assert gw.remoteaddress == specssh.ssh
-
- def test_host_not_found(self):
- py.test.raises(py.execnet.HostNotFound,
- "py.execnet.SshGateway('nowhere.codespeak.net')")
-
-class TestThreads:
- def test_threads(self):
- gw = py.execnet.PopenGateway()
- gw.remote_init_threads(3)
- c1 = gw.remote_exec("channel.send(channel.receive())")
- c2 = gw.remote_exec("channel.send(channel.receive())")
- c2.send(1)
- res = c2.receive()
- assert res == 1
- c1.send(42)
- res = c1.receive()
- assert res == 42
-
- def test_threads_twice(self):
- gw = py.execnet.PopenGateway()
- gw.remote_init_threads(3)
- py.test.raises(IOError, gw.remote_init_threads, 3)
-
-
-def test_nodebug():
- from py.__.execnet import gateway_base
- assert not gateway_base.debug
--- a/testing/pytest/plugin/test_pytest_execnetcleanup.py
+++ /dev/null
@@ -1,12 +0,0 @@
-def test_execnetplugin(testdir):
- reprec = testdir.inline_runsource("""
- import py
- import sys
- def test_hello():
- sys._gw = py.execnet.PopenGateway()
- def test_world():
- assert hasattr(sys, '_gw')
- assert sys._gw not in sys._gw._cleanup._activegateways
-
- """, "-s", "--debug")
- reprec.assertoutcome(passed=2)
--- a/testing/pytest/test_pickling.py
+++ b/testing/pytest/test_pickling.py
@@ -182,8 +182,9 @@ class TestConfigPickling:
old.chdir()
def test_config__setstate__wired_correctly_in_childprocess(testdir):
+ execnet = py.test.importorskip("execnet")
from py.__.test.dist.mypickle import PickleChannel
- gw = py.execnet.PopenGateway()
+ gw = execnet.PopenGateway()
channel = gw.remote_exec("""
import py
from py.__.test.dist.mypickle import PickleChannel
--- a/py/test/defaultconftest.py
+++ b/py/test/defaultconftest.py
@@ -10,5 +10,5 @@ Generator = py.test.collect.Generator
Function = py.test.collect.Function
Instance = py.test.collect.Instance
-pytest_plugins = "default runner capture terminal keyword xfail tmpdir execnetcleanup monkeypatch recwarn pdb pastebin unittest helpconfig nose assertion".split()
+pytest_plugins = "default runner capture terminal keyword xfail tmpdir monkeypatch recwarn pdb pastebin unittest helpconfig nose assertion".split()
--- a/testing/pytest/dist/test_dsession.py
+++ b/testing/pytest/dist/test_dsession.py
@@ -1,8 +1,9 @@
from py.__.test.dist.dsession import DSession
from py.__.test import outcome
import py
+import execnet
-XSpec = py.execnet.XSpec
+XSpec = execnet.XSpec
def run(item, node, excinfo=None):
runner = item.config.pluginmanager.getplugin("runner")
--- a/setup.py
+++ b/setup.py
@@ -8,22 +8,20 @@ from setuptools import setup
long_description = """
-advanced testing and development support library:
+advanced testing and development support library:
- `py.test`_: cross-project testing tool with many advanced features
-- `py.execnet`_: ad-hoc code distribution to SSH, Socket and local sub processes
-- `py.path`_: path abstractions over local and subversion files
+- `py.path`_: path abstractions over local and subversion files
- `py.code`_: dynamic code compile and traceback printing support
-Compatibility: Linux, Win32, OSX, Python versions 2.3-2.6.
+Compatibility: Linux, Win32, OSX, Python versions 2.4 through to 3.1.
For questions please check out http://pylib.org/contact.html
.. _`py.test`: http://pylib.org/test.html
-.. _`py.execnet`: http://pylib.org/execnet.html
.. _`py.path`: http://pylib.org/path.html
.. _`py.code`: http://pylib.org/code.html
-(c) Holger Krekel and others, 2009
+(c) Holger Krekel and others, 2009
"""
trunk = 'trunk'
@@ -63,8 +61,6 @@ def main():
'py.cmdline',
'py.code',
'py.compat',
- 'py.execnet',
- 'py.execnet.script',
'py.io',
'py.log',
'py.path',
--- a/testing/pytest/dist/test_mypickle.py
+++ b/testing/pytest/dist/test_mypickle.py
@@ -1,6 +1,7 @@
import py
import sys
+import execnet
Queue = py.builtin._tryimport('queue', 'Queue').Queue
@@ -117,7 +118,7 @@ def test_self_memoize():
TESTTIMEOUT = 2.0
class TestPickleChannelFunctional:
def setup_class(cls):
- cls.gw = py.execnet.PopenGateway()
+ cls.gw = execnet.PopenGateway()
cls.gw.remote_init_threads(5)
def test_popen_send_instance(self):
--- a/testing/execnet/conftest.py
+++ /dev/null
@@ -1,46 +0,0 @@
-import py
-
-def pytest_generate_tests(metafunc):
- if 'gw' in metafunc.funcargnames:
- if hasattr(metafunc.cls, 'gwtype'):
- gwtypes = [metafunc.cls.gwtype]
- else:
- gwtypes = ['popen', 'socket', 'ssh']
- for gwtype in gwtypes:
- metafunc.addcall(id=gwtype, param=gwtype)
-
-def pytest_funcarg__gw(request):
- scope = "session"
- if request.param == "popen":
- return request.cached_setup(
- setup=py.execnet.PopenGateway,
- teardown=lambda gw: gw.exit(),
- extrakey=request.param,
- scope=scope)
- elif request.param == "socket":
- return request.cached_setup(
- setup=setup_socket_gateway,
- teardown=teardown_socket_gateway,
- extrakey=request.param,
- scope=scope)
- elif request.param == "ssh":
- return request.cached_setup(
- setup=lambda: setup_ssh_gateway(request),
- teardown=lambda gw: gw.exit(),
- extrakey=request.param,
- scope=scope)
-
-def setup_socket_gateway():
- proxygw = py.execnet.PopenGateway()
- gw = py.execnet.SocketGateway.new_remote(proxygw, ("127.0.0.1", 0))
- gw.proxygw = proxygw
- return gw
-
-def teardown_socket_gateway(gw):
- gw.exit()
- gw.proxygw.exit()
-
-def setup_ssh_gateway(request):
- sshhost = request.getfuncargvalue('specssh').ssh
- gw = py.execnet.SshGateway(sshhost)
- return gw
--- a/testing/execnet/test_rsync.py
+++ /dev/null
@@ -1,148 +0,0 @@
-import py
-from py.execnet import RSync
-
-
-def pytest_funcarg__gw1(request):
- return request.cached_setup(
- setup=py.execnet.PopenGateway,
- teardown=lambda val: val.exit(),
- scope="module"
- )
-pytest_funcarg__gw2 = pytest_funcarg__gw1
-
-def pytest_funcarg__dirs(request):
- t = request.getfuncargvalue('tmpdir')
- class dirs:
- source = t.join("source")
- dest1 = t.join("dest1")
- dest2 = t.join("dest2")
- return dirs
-
-class TestRSync:
- def test_notargets(self, dirs):
- rsync = RSync(dirs.source)
- py.test.raises(IOError, "rsync.send()")
- assert rsync.send(raises=False) is None
-
- def test_dirsync(self, dirs, gw1, gw2):
- dest = dirs.dest1
- dest2 = dirs.dest2
- source = dirs.source
-
- for s in ('content1', 'content2', 'content2-a-bit-longer'):
- source.ensure('subdir', 'file1').write(s)
- rsync = RSync(dirs.source)
- rsync.add_target(gw1, dest)
- rsync.add_target(gw2, dest2)
- rsync.send()
- assert dest.join('subdir').check(dir=1)
- assert dest.join('subdir', 'file1').check(file=1)
- assert dest.join('subdir', 'file1').read() == s
- assert dest2.join('subdir').check(dir=1)
- assert dest2.join('subdir', 'file1').check(file=1)
- assert dest2.join('subdir', 'file1').read() == s
- for x in dest, dest2:
- fn = x.join("subdir", "file1")
- fn.setmtime(0)
-
- source.join('subdir').remove('file1')
- rsync = RSync(source)
- rsync.add_target(gw2, dest2)
- rsync.add_target(gw1, dest)
- rsync.send()
- assert dest.join('subdir', 'file1').check(file=1)
- assert dest2.join('subdir', 'file1').check(file=1)
- rsync = RSync(source)
- rsync.add_target(gw1, dest, delete=True)
- rsync.add_target(gw2, dest2)
- rsync.send()
- assert not dest.join('subdir', 'file1').check()
- assert dest2.join('subdir', 'file1').check()
-
- def test_dirsync_twice(self, dirs, gw1, gw2):
- source = dirs.source
- source.ensure("hello")
- rsync = RSync(source)
- rsync.add_target(gw1, dirs.dest1)
- rsync.send()
- assert dirs.dest1.join('hello').check()
- py.test.raises(IOError, "rsync.send()")
- assert rsync.send(raises=False) is None
- rsync.add_target(gw1, dirs.dest2)
- rsync.send()
- assert dirs.dest2.join('hello').check()
- py.test.raises(IOError, "rsync.send()")
- assert rsync.send(raises=False) is None
-
- def test_rsync_default_reporting(self, capsys, dirs, gw1):
- source = dirs.source
- source.ensure("hello")
- rsync = RSync(source)
- rsync.add_target(gw1, dirs.dest1)
- rsync.send()
- out, err = capsys.readouterr()
- assert out.find("hello") != -1
-
- def test_rsync_non_verbose(self, capsys, dirs, gw1):
- source = dirs.source
- source.ensure("hello")
- rsync = RSync(source, verbose=False)
- rsync.add_target(gw1, dirs.dest1)
- rsync.send()
- out, err = capsys.readouterr()
- assert not out
- assert not err
-
- def test_symlink_rsync(self, dirs, gw1):
- if py.std.sys.platform == 'win32':
- py.test.skip("symlinks are unsupported on Windows.")
- source = dirs.source
- dest = dirs.dest1
- dirs.source.ensure("existant")
- source.join("rellink").mksymlinkto(source.join("existant"), absolute=0)
- source.join('abslink').mksymlinkto(source.join("existant"))
-
- rsync = RSync(source)
- rsync.add_target(gw1, dest)
- rsync.send()
-
- assert dest.join('rellink').readlink() == dest.join("existant")
- assert dest.join('abslink').readlink() == dest.join("existant")
-
- def test_callback(self, dirs, gw1):
- dest = dirs.dest1
- source = dirs.source
- source.ensure("existant").write("a" * 100)
- source.ensure("existant2").write("a" * 10)
- total = {}
- def callback(cmd, lgt, channel):
- total[(cmd, lgt)] = True
-
- rsync = RSync(source, callback=callback)
- #rsync = RSync()
- rsync.add_target(gw1, dest)
- rsync.send()
-
- assert total == {("list", 110):True, ("ack", 100):True, ("ack", 10):True}
-
- def test_file_disappearing(self, dirs, gw1):
- dest = dirs.dest1
- source = dirs.source
- source.ensure("ex").write("a" * 100)
- source.ensure("ex2").write("a" * 100)
-
- class DRsync(RSync):
- def filter(self, x):
- assert x != source
- if x.endswith("ex2"):
- self.x = 1
- source.join("ex2").remove()
- return True
-
- rsync = DRsync(source)
- rsync.add_target(gw1, dest)
- rsync.send()
- assert rsync.x == 1
- assert len(dest.listdir()) == 1
- assert len(source.listdir()) == 1
-
--- a/testing/execnet/test_basics.py
+++ /dev/null
@@ -1,198 +0,0 @@
-
-import py
-import sys, os, subprocess, inspect
-from py.__.execnet import gateway_base, gateway
-from py.__.execnet.gateway_base import Message, Channel, ChannelFactory
-
-def test_subprocess_interaction(anypython):
- line = gateway.popen_bootstrapline
- compile(line, 'xyz', 'exec')
- args = [str(anypython), '-c', line]
- popen = subprocess.Popen(args, bufsize=0, stderr=subprocess.STDOUT,
- stdin=subprocess.PIPE, stdout=subprocess.PIPE)
- def send(line):
- popen.stdin.write(line.encode('ascii'))
- if sys.version_info > (3,0): # 3k still buffers
- popen.stdin.flush()
- def receive():
- return popen.stdout.readline().decode('ascii')
-
- try:
- source = py.code.Source(read_write_loop, "read_write_loop()")
- repr_source = repr(str(source)) + "\n"
- sendline = repr_source
- send(sendline)
- s = receive()
- assert s == "ok\n"
- send("hello\n")
- s = receive()
- assert s == "received: hello\n"
- send("world\n")
- s = receive()
- assert s == "received: world\n"
- finally:
- popen.stdin.close()
- popen.stdout.close()
- popen.wait()
-
-def read_write_loop():
- import os, sys
- sys.stdout.write("ok\n")
- sys.stdout.flush()
- while 1:
- try:
- line = sys.stdin.readline()
- sys.stdout.write("received: %s" % line)
- sys.stdout.flush()
- except (IOError, EOFError):
- break
-
-def pytest_generate_tests(metafunc):
- if 'anypython' in metafunc.funcargnames:
- for name in 'python3.1', 'python2.4', 'python2.5', 'python2.6':
- metafunc.addcall(id=name, param=name)
-
-def pytest_funcarg__anypython(request):
- name = request.param
- executable = py.path.local.sysfind(name)
- if executable is None:
- py.test.skip("no %s found" % (name,))
- return executable
-
-def test_io_message(anypython, tmpdir):
- check = tmpdir.join("check.py")
- check.write(py.code.Source(gateway_base, """
- try:
- from io import BytesIO
- except ImportError:
- from StringIO import StringIO as BytesIO
- import tempfile
- temp_out = BytesIO()
- temp_in = BytesIO()
- io = Popen2IO(temp_out, temp_in)
- for i, msg_cls in Message._types.items():
- print ("checking %s %s" %(i, msg_cls))
- for data in "hello", "hello".encode('ascii'):
- msg1 = msg_cls(i, data)
- msg1.writeto(io)
- x = io.outfile.getvalue()
- io.outfile.truncate(0)
- io.outfile.seek(0)
- io.infile.seek(0)
- io.infile.write(x)
- io.infile.seek(0)
- msg2 = Message.readfrom(io)
- assert msg1.channelid == msg2.channelid, (msg1, msg2)
- assert msg1.data == msg2.data
- print ("all passed")
- """))
- #out = py.process.cmdexec("%s %s" %(executable,check))
- out = anypython.sysexec(check)
- print (out)
- assert "all passed" in out
-
-def test_popen_io(anypython, tmpdir):
- check = tmpdir.join("check.py")
- check.write(py.code.Source(gateway_base, """
- do_exec(Popen2IO.server_stmt, globals())
- io.write("hello".encode('ascii'))
- s = io.read(1)
- assert s == "x".encode('ascii')
- """))
- from subprocess import Popen, PIPE
- args = [str(anypython), str(check)]
- proc = Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE)
- proc.stdin.write("x".encode('ascii'))
- stdout, stderr = proc.communicate()
- print (stderr)
- ret = proc.wait()
- assert "hello".encode('ascii') in stdout
-
-
-def test_rinfo_source(anypython, tmpdir):
- check = tmpdir.join("check.py")
- check.write(py.code.Source("""
- class Channel:
- def send(self, data):
- assert eval(repr(data), {}) == data
- channel = Channel()
- """, gateway.rinfo_source, """
- print ('all passed')
- """))
- out = anypython.sysexec(check)
- print (out)
- assert "all passed" in out
-
-def test_geterrortext(anypython, tmpdir):
- check = tmpdir.join("check.py")
- check.write(py.code.Source(gateway_base, """
- class Arg:
- pass
- errortext = geterrortext((Arg, "1", 4))
- assert "Arg" in errortext
- import sys
- try:
- raise ValueError("17")
- except ValueError:
- excinfo = sys.exc_info()
- s = geterrortext(excinfo)
- assert "17" in s
- print ("all passed")
- """))
- out = anypython.sysexec(check)
- print (out)
- assert "all passed" in out
-
-def test_stdouterrin_setnull():
- cap = py.io.StdCaptureFD()
- from py.__.execnet.gateway import stdouterrin_setnull
- stdouterrin_setnull()
- import os
- os.write(1, "hello".encode('ascii'))
- if os.name == "nt":
- os.write(2, "world")
- os.read(0, 1)
- out, err = cap.reset()
- assert not out
- assert not err
-
-
-class TestMessage:
- def test_wire_protocol(self):
- for cls in Message._types.values():
- one = py.io.BytesIO()
- data = '23'.encode('ascii')
- cls(42, data).writeto(one)
- two = py.io.BytesIO(one.getvalue())
- msg = Message.readfrom(two)
- assert isinstance(msg, cls)
- assert msg.channelid == 42
- assert msg.data == data
- assert isinstance(repr(msg), str)
- # == "<Message.%s channelid=42 '23'>" %(msg.__class__.__name__, )
-
-class TestPureChannel:
- def setup_method(self, method):
- self.fac = ChannelFactory(None)
-
- def test_factory_create(self):
- chan1 = self.fac.new()
- assert chan1.id == 1
- chan2 = self.fac.new()
- assert chan2.id == 3
-
- def test_factory_getitem(self):
- chan1 = self.fac.new()
- assert self.fac._channels[chan1.id] == chan1
- chan2 = self.fac.new()
- assert self.fac._channels[chan2.id] == chan2
-
- def test_channel_timeouterror(self):
- channel = self.fac.new()
- py.test.raises(IOError, channel.waitclose, timeout=0.01)
-
- def test_channel_makefile_incompatmode(self):
- channel = self.fac.new()
- py.test.raises(ValueError, 'channel.makefile("rw")')
-
-
--- a/py/execnet/script/socketserver.py
+++ /dev/null
@@ -1,102 +0,0 @@
-#! /usr/bin/env python
-
-"""
- start socket based minimal readline exec server
-"""
-# this part of the program only executes on the server side
-#
-
-progname = 'socket_readline_exec_server-1.2'
-
-import sys, socket, os
-try:
- import fcntl
-except ImportError:
- fcntl = None
-
-debug = 0
-
-if debug: # and not os.isatty(sys.stdin.fileno()):
- f = open('/tmp/execnet-socket-pyout.log', 'w')
- old = sys.stdout, sys.stderr
- sys.stdout = sys.stderr = f
- #import py
- #compile = py.code.compile
-
-def print_(*args):
- print(" ".join(str(arg) for arg in args))
-
-if sys.version_info > (3, 0):
- exec("""def exec_(source, locs):
- exec(source, locs)""")
-else:
- exec("""def exec_(source, locs):
- exec source in locs""")
-
-def exec_from_one_connection(serversock):
- print_(progname, 'Entering Accept loop', serversock.getsockname())
- clientsock,address = serversock.accept()
- print_(progname, 'got new connection from %s %s' % address)
- clientfile = clientsock.makefile('rb')
- print_("reading line")
- # rstrip so that we can use \r\n for telnet testing
- source = clientfile.readline().rstrip()
- clientfile.close()
- g = {'clientsock' : clientsock, 'address' : address}
- source = eval(source)
- if source:
- co = compile(source+'\n', source, 'exec')
- print_(progname, 'compiled source, executing')
- try:
- exec_(co, g)
- finally:
- print_(progname, 'finished executing code')
- # background thread might hold a reference to this (!?)
- #clientsock.close()
-
-def bind_and_listen(hostport):
- if isinstance(hostport, str):
- host, port = hostport.split(':')
- hostport = (host, int(port))
- serversock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- # set close-on-exec
- if hasattr(fcntl, 'FD_CLOEXEC'):
- old = fcntl.fcntl(serversock.fileno(), fcntl.F_GETFD)
- fcntl.fcntl(serversock.fileno(), fcntl.F_SETFD, old | fcntl.FD_CLOEXEC)
- # allow the address to be re-used in a reasonable amount of time
- if os.name == 'posix' and sys.platform != 'cygwin':
- serversock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
- serversock.bind(hostport)
- serversock.listen(5)
- return serversock
-
-def startserver(serversock, loop=False):
- try:
- while 1:
- try:
- exec_from_one_connection(serversock)
- except (KeyboardInterrupt, SystemExit):
- raise
- except:
- if debug:
- import traceback
- traceback.print_exc()
- else:
- excinfo = sys.exc_info()
- print_("got exception", excinfo[1])
- if not loop:
- break
- finally:
- print_("leaving socketserver execloop")
- serversock.shutdown(2)
-
-if __name__ == '__main__':
- import sys
- if len(sys.argv)>1:
- hostport = sys.argv[1]
- else:
- hostport = ':8888'
- serversock = bind_and_listen(hostport)
- startserver(serversock, loop=False)
-
--- a/bin-for-dist/test_install.py
+++ b/bin-for-dist/test_install.py
@@ -78,7 +78,7 @@ class VirtualEnv(object):
def makegateway(self):
python = self._cmd('python')
- return py.execnet.makegateway("popen//python=%s" %(python,))
+ return execnet.makegateway("popen//python=%s" %(python,))
def pcall(self, cmd, *args, **kw):
self.ensure()
--- a/py/test/looponfail/remote.py
+++ b/py/test/looponfail/remote.py
@@ -7,10 +7,9 @@
otherwise changes to source code can crash
the controlling process which should never happen.
"""
-
-from __future__ import generators
import py
import sys
+import execnet
from py.__.test.session import Session
from py.__.test.dist.mypickle import PickleChannel
from py.__.test.looponfail import util
@@ -55,7 +54,7 @@ class RemoteControl(object):
py.builtin.print_("RemoteControl:", msg)
def initgateway(self):
- return py.execnet.PopenGateway()
+ return execnet.PopenGateway()
def setup(self, out=None):
if out is None:
--- a/testing/pytest/dist/test_txnode.py
+++ b/testing/pytest/dist/test_txnode.py
@@ -1,5 +1,6 @@
import py
+import execnet
from py.__.test.dist.txnode import TXNode
queue = py.builtin._tryimport("queue", "Queue")
Queue = queue.Queue
@@ -46,8 +47,8 @@ class MySetup:
config = py.test.config._reparse([])
self.config = config
self.queue = Queue()
- self.xspec = py.execnet.XSpec("popen")
- self.gateway = py.execnet.makegateway(self.xspec)
+ self.xspec = execnet.XSpec("popen")
+ self.gateway = execnet.makegateway(self.xspec)
self.id += 1
self.gateway.id = str(self.id)
self.node = TXNode(self.gateway, self.config, putevent=self.queue.put)
--- a/example/execnet/popen_read_multiple.py
+++ b/example/execnet/popen_read_multiple.py
@@ -9,7 +9,7 @@ NUM_PROCESSES = 5
channels = []
for i in range(NUM_PROCESSES):
- gw = py.execnet.PopenGateway() # or use SSH or socket gateways
+ gw = execnet.PopenGateway() # or use SSH or socket gateways
channel = gw.remote_exec("""
import time
secs = channel.receive()
@@ -19,7 +19,7 @@ for i in range(NUM_PROCESSES):
channels.append(channel)
print "*** instantiated subprocess", gw
-mc = py.execnet.MultiChannel(channels)
+mc = execnet.MultiChannel(channels)
queue = mc.make_receive_queue()
print "***", "verifying that timeout on receiving results from blocked subprocesses works"
--- a/py/path/gateway/channeltest.py
+++ b/py/path/gateway/channeltest.py
@@ -52,7 +52,7 @@ class PathServer:
if __name__ == '__main__':
import py
- gw = py.execnet.PopenGateway()
+ gw = execnet.PopenGateway()
channel = gw._channelfactory.new()
srv = PathServer(channel)
c = gw.remote_exec("""
--- a/doc/changelog.txt
+++ b/doc/changelog.txt
@@ -1,6 +1,8 @@
Changes between 1.0.x and 'trunk'
=====================================
+* remove py.execnet code and substitute all usages with 'execnet' proper
+
* fix issue50 - cached_setup now caches more to expectations
for test functions with multiple arguments.
--- a/py/execnet/script/__init__.py
+++ /dev/null
@@ -1,1 +0,0 @@
-#
--- a/py/execnet/script/xx.py
+++ /dev/null
@@ -1,9 +0,0 @@
-import rlcompleter2
-rlcompleter2.setup()
-
-import register, sys
-try:
- hostport = sys.argv[1]
-except:
- hostport = ':8888'
-gw = register.ServerGateway(hostport)
--- a/testing/execnet/__init__.py
+++ /dev/null
@@ -1,1 +0,0 @@
-#
--- a/py/execnet/serializer.py
+++ /dev/null
@@ -1,272 +0,0 @@
-"""
-Simple marshal format (based on pickle) designed to work across Python versions.
-"""
-
-import sys
-import struct
-
-_INPY3 = _REALLY_PY3 = sys.version_info > (3, 0)
-
-class SerializeError(Exception):
- pass
-
-class SerializationError(SerializeError):
- """Error while serializing an object."""
-
-class UnserializableType(SerializationError):
- """Can't serialize a type."""
-
-class UnserializationError(SerializeError):
- """Error while unserializing an object."""
-
-class VersionMismatch(UnserializationError):
- """Data from a previous or later format."""
-
-class Corruption(UnserializationError):
- """The pickle format appears to have been corrupted."""
-
-if _INPY3:
- def b(s):
- return s.encode("ascii")
-else:
- b = str
-
-FOUR_BYTE_INT_MAX = 2147483647
-
-_int4_format = struct.Struct("!i")
-_float_format = struct.Struct("!d")
-
-# Protocol constants
-VERSION_NUMBER = 1
-VERSION = b(chr(VERSION_NUMBER))
-PY2STRING = b('s')
-PY3STRING = b('t')
-UNICODE = b('u')
-BYTES = b('b')
-NEWLIST = b('l')
-BUILDTUPLE = b('T')
-SETITEM = b('m')
-NEWDICT = b('d')
-INT = b('i')
-FLOAT = b('f')
-STOP = b('S')
-
-class CrossVersionOptions(object):
- pass
-
-class Serializer(object):
-
- def __init__(self, stream):
- self.stream = stream
-
- def save(self, obj):
- self.stream.write(VERSION)
- self._save(obj)
- self.stream.write(STOP)
-
- def _save(self, obj):
- tp = type(obj)
- try:
- dispatch = self.dispatch[tp]
- except KeyError:
- raise UnserializableType("can't serialize %s" % (tp,))
- dispatch(self, obj)
-
- dispatch = {}
-
- def save_bytes(self, bytes_):
- self.stream.write(BYTES)
- self._write_byte_sequence(bytes_)
- dispatch[bytes] = save_bytes
-
- if _INPY3:
- def save_string(self, s):
- self.stream.write(PY3STRING)
- self._write_unicode_string(s)
- else:
- def save_string(self, s):
- self.stream.write(PY2STRING)
- self._write_byte_sequence(s)
-
- def save_unicode(self, s):
- self.stream.write(UNICODE)
- self._write_unicode_string(s)
- dispatch[unicode] = save_unicode
- dispatch[str] = save_string
-
- def _write_unicode_string(self, s):
- try:
- as_bytes = s.encode("utf-8")
- except UnicodeEncodeError:
- raise SerializationError("strings must be utf-8 encodable")
- self._write_byte_sequence(as_bytes)
-
- def _write_byte_sequence(self, bytes_):
- self._write_int4(len(bytes_), "string is too long")
- self.stream.write(bytes_)
-
- def save_int(self, i):
- self.stream.write(INT)
- self._write_int4(i)
- dispatch[int] = save_int
-
- def save_float(self, flt):
- self.stream.write(FLOAT)
- self.stream.write(_float_format.pack(flt))
- dispatch[float] = save_float
-
- def _write_int4(self, i, error="int must be less than %i" %
- (FOUR_BYTE_INT_MAX,)):
- if i > FOUR_BYTE_INT_MAX:
- raise SerializationError(error)
- self.stream.write(_int4_format.pack(i))
-
- def save_list(self, L):
- self.stream.write(NEWLIST)
- self._write_int4(len(L), "list is too long")
- for i, item in enumerate(L):
- self._write_setitem(i, item)
- dispatch[list] = save_list
-
- def _write_setitem(self, key, value):
- self._save(key)
- self._save(value)
- self.stream.write(SETITEM)
-
- def save_dict(self, d):
- self.stream.write(NEWDICT)
- for key, value in d.items():
- self._write_setitem(key, value)
- dispatch[dict] = save_dict
-
- def save_tuple(self, tup):
- for item in tup:
- self._save(item)
- self.stream.write(BUILDTUPLE)
- self._write_int4(len(tup), "tuple is too long")
- dispatch[tuple] = save_tuple
-
-
-class _UnserializationOptions(object):
- pass
-
-class _Py2UnserializationOptions(_UnserializationOptions):
-
- def __init__(self, py3_strings_as_str=False):
- self.py3_strings_as_str = py3_strings_as_str
-
-class _Py3UnserializationOptions(_UnserializationOptions):
-
- def __init__(self, py2_strings_as_str=False):
- self.py2_strings_as_str = py2_strings_as_str
-
-if _INPY3:
- UnserializationOptions = _Py3UnserializationOptions
-else:
- UnserializationOptions = _Py2UnserializationOptions
-
-class _Stop(Exception):
- pass
-
-class Unserializer(object):
-
- def __init__(self, stream, options=UnserializationOptions()):
- self.stream = stream
- self.options = options
-
- def load(self):
- self.stack = []
- version = ord(self.stream.read(1))
- if version != VERSION_NUMBER:
- raise VersionMismatch("%i != %i" % (version, VERSION_NUMBER))
- try:
- while True:
- opcode = self.stream.read(1)
- if not opcode:
- raise EOFError
- try:
- loader = self.opcodes[opcode]
- except KeyError:
- raise Corruption("unkown opcode %s" % (opcode,))
- loader(self)
- except _Stop:
- if len(self.stack) != 1:
- raise UnserializationError("internal unserialization error")
- return self.stack[0]
- else:
- raise Corruption("didn't get STOP")
-
- opcodes = {}
-
- def load_int(self):
- i = self._read_int4()
- self.stack.append(i)
- opcodes[INT] = load_int
-
- def load_float(self):
- binary = self.stream.read(_float_format.size)
- self.stack.append(_float_format.unpack(binary)[0])
- opcodes[FLOAT] = load_float
-
- def _read_int4(self):
- return _int4_format.unpack(self.stream.read(4))[0]
-
- def _read_byte_string(self):
- length = self._read_int4()
- as_bytes = self.stream.read(length)
- return as_bytes
-
- def load_py3string(self):
- as_bytes = self._read_byte_string()
- if not _INPY3 and self.options.py3_strings_as_str:
- # XXX Should we try to decode into latin-1?
- self.stack.append(as_bytes)
- else:
- self.stack.append(as_bytes.decode("utf-8"))
- opcodes[PY3STRING] = load_py3string
-
- def load_py2string(self):
- as_bytes = self._read_byte_string()
- if _INPY3 and self.options.py2_strings_as_str:
- s = as_bytes.decode("latin-1")
- else:
- s = as_bytes
- self.stack.append(s)
- opcodes[PY2STRING] = load_py2string
-
- def load_bytes(self):
- s = self._read_byte_string()
- self.stack.append(s)
- opcodes[BYTES] = load_bytes
-
- def load_unicode(self):
- self.stack.append(self._read_byte_string().decode("utf-8"))
- opcodes[UNICODE] = load_unicode
-
- def load_newlist(self):
- length = self._read_int4()
- self.stack.append([None] * length)
- opcodes[NEWLIST] = load_newlist
-
- def load_setitem(self):
- if len(self.stack) < 3:
- raise Corruption("not enough items for setitem")
- value = self.stack.pop()
- key = self.stack.pop()
- self.stack[-1][key] = value
- opcodes[SETITEM] = load_setitem
-
- def load_newdict(self):
- self.stack.append({})
- opcodes[NEWDICT] = load_newdict
-
- def load_buildtuple(self):
- length = self._read_int4()
- tup = tuple(self.stack[-length:])
- del self.stack[-length:]
- self.stack.append(tup)
- opcodes[BUILDTUPLE] = load_buildtuple
-
- def load_stop(self):
- raise _Stop
- opcodes[STOP] = load_stop
--- a/conftest.py
+++ b/conftest.py
@@ -17,15 +17,18 @@ def pytest_addoption(parser):
def pytest_funcarg__specssh(request):
return getspecssh(request.config)
-def pytest_funcarg__specsocket(request):
- return getsocketspec(request.config)
+def getgspecs(config=None):
+ if config is None:
+ config = py.test.config
+ return [execnet.XSpec(spec)
+ for spec in config.getvalueorskip("gspecs")]
# configuration information for tests
def getgspecs(config=None):
if config is None:
config = py.test.config
- return [py.execnet.XSpec(spec)
+ return [execnet.XSpec(spec)
for spec in config.getvalueorskip("gspecs")]
def getspecssh(config=None):
--- a/py/execnet/script/shell.py
+++ /dev/null
@@ -1,85 +0,0 @@
-#! /usr/bin/env python
-"""
-a remote python shell
-
-for injection into startserver.py
-"""
-import sys, os, socket, select
-
-try:
- clientsock
-except NameError:
- print("client side starting")
- import sys
- host, port = sys.argv[1].split(':')
- port = int(port)
- myself = open(os.path.abspath(sys.argv[0]), 'rU').read()
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.connect((host, port))
- sock.sendall(repr(myself)+'\n')
- print("send boot string")
- inputlist = [ sock, sys.stdin ]
- try:
- while 1:
- r,w,e = select.select(inputlist, [], [])
- if sys.stdin in r:
- line = raw_input()
- sock.sendall(line + '\n')
- if sock in r:
- line = sock.recv(4096)
- sys.stdout.write(line)
- sys.stdout.flush()
- except:
- import traceback
- print(traceback.print_exc())
-
- sys.exit(1)
-
-print("server side starting")
-# server side
-#
-from traceback import print_exc
-from threading import Thread
-
-class promptagent(Thread):
- def __init__(self, clientsock):
- Thread.__init__(self)
- self.clientsock = clientsock
-
- def run(self):
- print("Entering thread prompt loop")
- clientfile = self.clientsock.makefile('w')
-
- filein = self.clientsock.makefile('r')
- loc = self.clientsock.getsockname()
-
- while 1:
- try:
- clientfile.write('%s %s >>> ' % loc)
- clientfile.flush()
- line = filein.readline()
- if len(line)==0: raise EOFError("nothing")
- #print >>sys.stderr,"got line: " + line
- if line.strip():
- oldout, olderr = sys.stdout, sys.stderr
- sys.stdout, sys.stderr = clientfile, clientfile
- try:
- try:
- exec(compile(line + '\n','<remote pyin>', 'single'))
- except:
- print_exc()
- finally:
- sys.stdout=oldout
- sys.stderr=olderr
- clientfile.flush()
- except EOFError:
- e = sys.exc_info()[1]
- sys.stderr.write("connection close, prompt thread returns")
- break
- #print >>sys.stdout, "".join(apply(format_exception,sys.exc_info()))
-
- self.clientsock.close()
-
-prompter = promptagent(clientsock)
-prompter.start()
-print("promptagent - thread started")
--- a/py/__init__.py
+++ b/py/__init__.py
@@ -1,22 +1,20 @@
# -*- coding: utf-8 -*-
"""
-advanced testing and development support library:
+advanced testing and development support library:
- `py.test`_: cross-project testing tool with many advanced features
-- `py.execnet`_: ad-hoc code distribution to SSH, Socket and local sub processes
-- `py.path`_: path abstractions over local and subversion files
+- `py.path`_: path abstractions over local and subversion files
- `py.code`_: dynamic code compile and traceback printing support
-Compatibility: Linux, Win32, OSX, Python versions 2.3-2.6.
+Compatibility: Linux, Win32, OSX, Python versions 2.4 through to 3.1.
For questions please check out http://pylib.org/contact.html
.. _`py.test`: http://pylib.org/test.html
-.. _`py.execnet`: http://pylib.org/execnet.html
.. _`py.path`: http://pylib.org/path.html
.. _`py.code`: http://pylib.org/code.html
-(c) Holger Krekel and others, 2009
+(c) Holger Krekel and others, 2009
"""
from py.initpkg import initpkg
trunk = "trunk"
@@ -159,21 +157,6 @@ initpkg(__name__,
'builtin.execfile' : ('./builtin/builtin31.py', 'execfile'),
'builtin.callable' : ('./builtin/builtin31.py', 'callable'),
- # gateways into remote contexts
- 'execnet.__doc__' : ('./execnet/__init__.py', '__doc__'),
- 'execnet._HookSpecs' : ('./execnet/gateway_base.py', 'ExecnetAPI'),
- 'execnet.SocketGateway' : ('./execnet/gateway.py', 'SocketGateway'),
- 'execnet.PopenGateway' : ('./execnet/gateway.py', 'PopenGateway'),
- 'execnet.SshGateway' : ('./execnet/gateway.py', 'SshGateway'),
- 'execnet.HostNotFound' : ('./execnet/gateway.py', 'HostNotFound'),
- 'execnet.XSpec' : ('./execnet/xspec.py', 'XSpec'),
- 'execnet.makegateway' : ('./execnet/xspec.py', 'makegateway'),
- 'execnet.MultiGateway' : ('./execnet/multi.py', 'MultiGateway'),
- 'execnet.MultiChannel' : ('./execnet/multi.py', 'MultiChannel'),
-
- # execnet scripts
- 'execnet.RSync' : ('./execnet/rsync.py', 'RSync'),
-
# input-output helping
'io.__doc__' : ('./io/__init__.py', '__doc__'),
'io.dupfile' : ('./io/capture.py', 'dupfile'),
--- a/py/test/dist/gwmanage.py
+++ b/py/test/dist/gwmanage.py
@@ -4,7 +4,8 @@
import py
import sys, os
-from py.__.execnet.gateway_base import RemoteError
+import execnet
+from execnet.gateway_base import RemoteError
class GatewayManager:
RemoteError = RemoteError
@@ -13,8 +14,8 @@ class GatewayManager:
self.specs = []
self.hook = hook
for spec in specs:
- if not isinstance(spec, py.execnet.XSpec):
- spec = py.execnet.XSpec(spec)
+ if not isinstance(spec, execnet.XSpec):
+ spec = execnet.XSpec(spec)
if not spec.chdir and not spec.popen:
spec.chdir = defaultchdir
self.specs.append(spec)
@@ -22,7 +23,7 @@ class GatewayManager:
def makegateways(self):
assert not self.gateways
for spec in self.specs:
- gw = py.execnet.makegateway(spec)
+ gw = execnet.makegateway(spec)
self.gateways.append(gw)
gw.id = "[%s]" % len(self.gateways)
self.hook.pytest_gwmanage_newgateway(
@@ -39,7 +40,7 @@ class GatewayManager:
else:
if remote:
l.append(gw)
- return py.execnet.MultiGateway(gateways=l)
+ return execnet.MultiGateway(gateways=l)
def multi_exec(self, source, inplacelocal=True):
""" remote execute code on all gateways.
@@ -87,7 +88,7 @@ class GatewayManager:
gw = self.gateways.pop()
gw.exit()
-class HostRSync(py.execnet.RSync):
+class HostRSync(execnet.RSync):
""" RSyncer that filters out common files
"""
def __init__(self, sourcedir, *args, **kwargs):
--- a/doc/test/customize.txt
+++ b/doc/test/customize.txt
@@ -364,7 +364,7 @@ remote environment. For this you can im
def pytest_gwmanage_newgateway(gateway, platinfo):
""" called after a gateway is instantiated. """
-The ``gateway`` object here has a ``spec`` attribute which is an ``py.execnet.XSpec``
+The ``gateway`` object here has a ``spec`` attribute which is an ``execnet.XSpec``
object, which has attributes that map key/values as specified from a ``--txspec``
option. The platinfo object is a dictionary with information about the remote process:
--- a/py/execnet/gateway_base.py
+++ /dev/null
@@ -1,757 +0,0 @@
-"""
-base execnet gateway code, a quick overview.
-
-the code of this module is sent to the "other side"
-as a means of bootstrapping a Gateway object
-capable of receiving and executing code,
-and routing data through channels.
-
-Gateways operate on InputOutput objects offering
-a write and a read(n) method.
-
-Once bootstrapped a higher level protocol
-based on Messages is used. Messages are serialized
-to and from InputOutput objects. The details of this protocol
-are locally defined in this module. There is no need
-for standardizing or versioning the protocol.
-
-After bootstrapping the BaseGateway opens a receiver thread which
-accepts encoded messages and triggers actions to interpret them.
-Sending of channel data items happens directly through
-write operations to InputOutput objects so there is no
-separate thread.
-
-Code execution messages are put into an execqueue from
-which they will be taken for execution. gateway.serve()
-will take and execute such items, one by one. This means
-that by incoming default execution is single-threaded.
-
-The receiver thread terminates if the remote side sends
-a gateway termination message or if the IO-connection drops.
-It puts an end symbol into the execqueue so
-that serve() can cleanly finish as well.
-
-(C) 2004-2009 Holger Krekel, Armin Rigo and others
-"""
-import sys, os, weakref
-import threading, traceback, socket, struct
-try:
- import queue
-except ImportError:
- import Queue as queue
-
-if sys.version_info > (3, 0):
- exec("""def do_exec(co, loc):
- exec(co, loc)""")
- unicode = str
-else:
- exec("""def do_exec(co, loc):
- exec co in loc""")
- bytes = str
-
-
-def str(*args):
- raise EnvironmentError(
- "use unicode or bytes, not cross-python ambigous 'str'")
-
-default_encoding = "UTF-8"
-sysex = (KeyboardInterrupt, SystemExit)
-
-debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid() , 'w')
-
-
-# ___________________________________________________________________________
-#
-# input output classes
-# ___________________________________________________________________________
-
-class SocketIO:
- server_stmt = "io = SocketIO(clientsock)"
-
- error = (socket.error, EOFError)
- def __init__(self, sock):
- self.sock = sock
- try:
- sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
- sock.setsockopt(socket.SOL_IP, socket.IP_TOS, 0x10)# IPTOS_LOWDELAY
- except socket.error:
- e = sys.exc_info()[1]
- sys.stderr.write("WARNING: cannot set socketoption")
- self.readable = self.writeable = True
-
- def read(self, numbytes):
- "Read exactly 'bytes' bytes from the socket."
- buf = bytes()
- while len(buf) < numbytes:
- t = self.sock.recv(numbytes - len(buf))
- if not t:
- raise EOFError
- buf += t
- return buf
-
- def write(self, data):
- assert isinstance(data, bytes)
- self.sock.sendall(data)
-
- def close_read(self):
- if self.readable:
- try:
- self.sock.shutdown(0)
- except socket.error:
- pass
- self.readable = None
- def close_write(self):
- if self.writeable:
- try:
- self.sock.shutdown(1)
- except socket.error:
- pass
- self.writeable = None
-
-class Popen2IO:
- server_stmt = """
-import os, sys, tempfile
-io = Popen2IO(sys.stdout, sys.stdin)
-sys.stdout = tempfile.TemporaryFile('w')
-sys.stdin = tempfile.TemporaryFile('r')
-"""
- error = (IOError, OSError, EOFError)
-
- def __init__(self, outfile, infile):
- # we need raw byte streams
- self.outfile, self.infile = outfile, infile
- if sys.platform == "win32":
- import msvcrt
- msvcrt.setmode(infile.fileno(), os.O_BINARY)
- msvcrt.setmode(outfile.fileno(), os.O_BINARY)
- self.readable = self.writeable = True
-
- def read(self, numbytes):
- """Read exactly 'numbytes' bytes from the pipe. """
- try:
- data = self.infile.buffer.read(numbytes)
- except AttributeError:
- data = self.infile.read(numbytes)
- if len(data) < numbytes:
- raise EOFError
- return data
-
- def write(self, data):
- """write out all data bytes. """
- assert isinstance(data, bytes)
- try:
- self.outfile.buffer.write(data)
- except AttributeError:
- self.outfile.write(data)
- self.outfile.flush()
-
- def close_read(self):
- if self.readable:
- self.infile.close()
- self.readable = None
-
- def close_write(self):
- try:
- self.outfile.close()
- except EnvironmentError:
- pass
- self.writeable = None
-
-# ___________________________________________________________________________
-#
-# Messages
-# ___________________________________________________________________________
-# the header format
-HDR_FORMAT = "!hhii"
-HDR_SIZE = struct.calcsize(HDR_FORMAT)
-
-is3k = sys.version_info >= (3,0)
-
-class Message:
- """ encapsulates Messages and their wire protocol. """
- _types = {}
- def __init__(self, channelid=0, data=''):
- self.channelid = channelid
- self.data = data
-
- def writeto(self, io):
- # XXX marshal.dumps doesn't work for exchanging data across Python
- # version :-((( XXX check this statement wrt python2.4 through 3.1
- data = self.data
- if isinstance(data, bytes):
- dataformat = 1 + int(is3k)
- else:
- if isinstance(data, unicode):
- dataformat = 3
- else:
- data = repr(self.data) # argh
- dataformat = 4
- data = data.encode(default_encoding)
- header = struct.pack(HDR_FORMAT, self.msgtype, dataformat,
- self.channelid, len(data))
- io.write(header + data)
-
- def readfrom(cls, io):
- header = io.read(HDR_SIZE)
- (msgtype, dataformat,
- senderid, stringlen) = struct.unpack(HDR_FORMAT, header)
- data = io.read(stringlen)
- if dataformat == 1:
- if is3k:
- # remote was python2-str, we are 3k-text
- data = data.decode(default_encoding)
- elif dataformat == 2:
- # remote was python3-bytes
- pass
- else:
- data = data.decode(default_encoding)
- if dataformat == 3:
- pass
- elif dataformat == 4:
- data = eval(data, {}) # reversed argh
- else:
- raise ValueError("bad data format")
- return cls._types[msgtype](senderid, data)
- readfrom = classmethod(readfrom)
-
- def __repr__(self):
- r = repr(self.data)
- if len(r) > 50:
- return "<Message.%s channelid=%d len=%d>" %(self.__class__.__name__,
- self.channelid, len(r))
- else:
- return "<Message.%s channelid=%d %r>" %(self.__class__.__name__,
- self.channelid, self.data)
-
-def _setupmessages():
- class CHANNEL_OPEN(Message):
- def received(self, gateway):
- channel = gateway._channelfactory.new(self.channelid)
- gateway._local_schedulexec(channel=channel, sourcetask=self.data)
-
- class CHANNEL_NEW(Message):
- def received(self, gateway):
- """ receive a remotely created new (sub)channel. """
- newid = self.data
- newchannel = gateway._channelfactory.new(newid)
- gateway._channelfactory._local_receive(self.channelid, newchannel)
-
- class CHANNEL_DATA(Message):
- def received(self, gateway):
- gateway._channelfactory._local_receive(self.channelid, self.data)
-
- class CHANNEL_CLOSE(Message):
- def received(self, gateway):
- gateway._channelfactory._local_close(self.channelid)
-
- class CHANNEL_CLOSE_ERROR(Message):
- def received(self, gateway):
- remote_error = gateway._channelfactory.RemoteError(self.data)
- gateway._channelfactory._local_close(self.channelid, remote_error)
-
- class CHANNEL_LAST_MESSAGE(Message):
- def received(self, gateway):
- gateway._channelfactory._local_close(self.channelid, sendonly=True)
-
- classes = [CHANNEL_OPEN, CHANNEL_NEW, CHANNEL_DATA,
- CHANNEL_CLOSE, CHANNEL_CLOSE_ERROR, CHANNEL_LAST_MESSAGE]
-
- for i, cls in enumerate(classes):
- Message._types[i] = cls
- cls.msgtype = i
- setattr(Message, cls.__name__, cls)
-
-_setupmessages()
-
-def geterrortext(excinfo):
- try:
- l = traceback.format_exception(*excinfo)
- errortext = "".join(l)
- except sysex:
- raise
- except:
- errortext = '%s: %s' % (excinfo[0].__name__,
- excinfo[1])
- return errortext
-
-class RemoteError(EOFError):
- """ Contains an Exceptions from the other side. """
- def __init__(self, formatted):
- self.formatted = formatted
- EOFError.__init__(self)
-
- def __str__(self):
- return self.formatted
-
- def __repr__(self):
- return "%s: %s" %(self.__class__.__name__, self.formatted)
-
- def warn(self):
- # XXX do this better
- sys.stderr.write("Warning: unhandled %r\n" % (self,))
-
-
-NO_ENDMARKER_WANTED = object()
-
-class Channel(object):
- """Communication channel between two possibly remote threads of code. """
- RemoteError = RemoteError
-
- def __init__(self, gateway, id):
- assert isinstance(id, int)
- self.gateway = gateway
- self.id = id
- self._items = queue.Queue()
- self._closed = False
- self._receiveclosed = threading.Event()
- self._remoteerrors = []
-
- def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED):
- # we first execute the callback on all already received
- # items. We need to hold the receivelock to prevent
- # race conditions with newly arriving items.
- # after having cleared the queue we register
- # the callback only if the channel is not closed already.
- _callbacks = self.gateway._channelfactory._callbacks
- _receivelock = self.gateway._receivelock
- _receivelock.acquire()
- try:
- if self._items is None:
- raise IOError("%r has callback already registered" %(self,))
- items = self._items
- self._items = None
- while 1:
- try:
- olditem = items.get(block=False)
- except queue.Empty:
- if not (self._closed or self._receiveclosed.isSet()):
- _callbacks[self.id] = (callback, endmarker)
- break
- else:
- if olditem is ENDMARKER:
- items.put(olditem) # for other receivers
- if endmarker is not NO_ENDMARKER_WANTED:
- callback(endmarker)
- break
- else:
- callback(olditem)
- finally:
- _receivelock.release()
-
- def __repr__(self):
- flag = self.isclosed() and "closed" or "open"
- return "<Channel id=%d %s>" % (self.id, flag)
-
- def __del__(self):
- if self.gateway is None: # can be None in tests
- return
- self.gateway._trace("Channel(%d).__del__" % self.id)
- # no multithreading issues here, because we have the last ref to 'self'
- if self._closed:
- # state transition "closed" --> "deleted"
- for error in self._remoteerrors:
- error.warn()
- elif self._receiveclosed.isSet():
- # state transition "sendonly" --> "deleted"
- # the remote channel is already in "deleted" state, nothing to do
- pass
- else:
- # state transition "opened" --> "deleted"
- if self._items is None: # has_callback
- Msg = Message.CHANNEL_LAST_MESSAGE
- else:
- Msg = Message.CHANNEL_CLOSE
- self.gateway._send(Msg(self.id))
-
- def _getremoteerror(self):
- try:
- return self._remoteerrors.pop(0)
- except IndexError:
- return None
-
- #
- # public API for channel objects
- #
- def isclosed(self):
- """ return True if the channel is closed. A closed
- channel may still hold items.
- """
- return self._closed
-
- def makefile(self, mode='w', proxyclose=False):
- """ return a file-like object.
- mode: 'w' for writes, 'r' for reads
- proxyclose: if true file.close() will
- trigger a channel.close() call.
- """
- if mode == "w":
- return ChannelFileWrite(channel=self, proxyclose=proxyclose)
- elif mode == "r":
- return ChannelFileRead(channel=self, proxyclose=proxyclose)
- raise ValueError("mode %r not availabe" %(mode,))
-
- def close(self, error=None):
- """ close down this channel on both sides. """
- if not self._closed:
- # state transition "opened/sendonly" --> "closed"
- # threads warning: the channel might be closed under our feet,
- # but it's never damaging to send too many CHANNEL_CLOSE messages
- put = self.gateway._send
- if error is not None:
- put(Message.CHANNEL_CLOSE_ERROR(self.id, error))
- else:
- put(Message.CHANNEL_CLOSE(self.id))
- if isinstance(error, RemoteError):
- self._remoteerrors.append(error)
- self._closed = True # --> "closed"
- self._receiveclosed.set()
- queue = self._items
- if queue is not None:
- queue.put(ENDMARKER)
- self.gateway._channelfactory._no_longer_opened(self.id)
-
- def waitclose(self, timeout=None):
- """ wait until this channel is closed (or the remote side
- otherwise signalled that no more data was being sent).
- The channel may still hold receiveable items, but not receive
- more. waitclose() reraises exceptions from executing code on
- the other side as channel.RemoteErrors containing a a textual
- representation of the remote traceback.
- """
- self._receiveclosed.wait(timeout=timeout) # wait for non-"opened" state
- if not self._receiveclosed.isSet():
- raise IOError("Timeout")
- error = self._getremoteerror()
- if error:
- raise error
-
- def send(self, item):
- """sends the given item to the other side of the channel,
- possibly blocking if the sender queue is full.
- Note that an item needs to be marshallable.
- """
- if self.isclosed():
- raise IOError("cannot send to %r" %(self,))
- if isinstance(item, Channel):
- data = Message.CHANNEL_NEW(self.id, item.id)
- else:
- data = Message.CHANNEL_DATA(self.id, item)
- self.gateway._send(data)
-
- def receive(self):
- """receives an item that was sent from the other side,
- possibly blocking if there is none.
- Note that exceptions from the other side will be
- reraised as channel.RemoteError exceptions containing
- a textual representation of the remote traceback.
- """
- queue = self._items
- if queue is None:
- raise IOError("calling receive() on channel with receiver callback")
- x = queue.get()
- if x is ENDMARKER:
- queue.put(x) # for other receivers
- raise self._getremoteerror() or EOFError()
- else:
- return x
-
- def __iter__(self):
- return self
-
- def next(self):
- try:
- return self.receive()
- except EOFError:
- raise StopIteration
- __next__ = next
-
-ENDMARKER = object()
-
-class ChannelFactory(object):
- RemoteError = RemoteError
-
- def __init__(self, gateway, startcount=1):
- self._channels = weakref.WeakValueDictionary()
- self._callbacks = {}
- self._writelock = threading.Lock()
- self.gateway = gateway
- self.count = startcount
- self.finished = False
-
- def new(self, id=None):
- """ create a new Channel with 'id' (or create new id if None). """
- self._writelock.acquire()
- try:
- if self.finished:
- raise IOError("connexion already closed: %s" % (self.gateway,))
- if id is None:
- id = self.count
- self.count += 2
- channel = Channel(self.gateway, id)
- self._channels[id] = channel
- return channel
- finally:
- self._writelock.release()
-
- def channels(self):
- return list(self._channels.values())
-
- #
- # internal methods, called from the receiver thread
- #
- def _no_longer_opened(self, id):
- try:
- del self._channels[id]
- except KeyError:
- pass
- try:
- callback, endmarker = self._callbacks.pop(id)
- except KeyError:
- pass
- else:
- if endmarker is not NO_ENDMARKER_WANTED:
- callback(endmarker)
-
- def _local_close(self, id, remoteerror=None, sendonly=False):
- channel = self._channels.get(id)
- if channel is None:
- # channel already in "deleted" state
- if remoteerror:
- remoteerror.warn()
- else:
- # state transition to "closed" state
- if remoteerror:
- channel._remoteerrors.append(remoteerror)
- if not sendonly: # otherwise #--> "sendonly"
- channel._closed = True # --> "closed"
- channel._receiveclosed.set()
- queue = channel._items
- if queue is not None:
- queue.put(ENDMARKER)
- self._no_longer_opened(id)
-
- def _local_receive(self, id, data):
- # executes in receiver thread
- try:
- callback, endmarker = self._callbacks[id]
- except KeyError:
- channel = self._channels.get(id)
- queue = channel and channel._items
- if queue is None:
- pass # drop data
- else:
- queue.put(data)
- else:
- callback(data) # even if channel may be already closed
-
- def _finished_receiving(self):
- self._writelock.acquire()
- try:
- self.finished = True
- finally:
- self._writelock.release()
- for id in list(self._channels):
- self._local_close(id, sendonly=True)
- for id in list(self._callbacks):
- self._no_longer_opened(id)
-
-class ChannelFile(object):
- def __init__(self, channel, proxyclose=True):
- self.channel = channel
- self._proxyclose = proxyclose
-
- def close(self):
- if self._proxyclose:
- self.channel.close()
-
- def __repr__(self):
- state = self.channel.isclosed() and 'closed' or 'open'
- return '<ChannelFile %d %s>' %(self.channel.id, state)
-
-class ChannelFileWrite(ChannelFile):
- def write(self, out):
- self.channel.send(out)
-
- def flush(self):
- pass
-
-class ChannelFileRead(ChannelFile):
- def __init__(self, channel, proxyclose=True):
- super(ChannelFileRead, self).__init__(channel, proxyclose)
- self._buffer = ""
-
- def read(self, n):
- while len(self._buffer) < n:
- try:
- self._buffer += self.channel.receive()
- except EOFError:
- self.close()
- break
- ret = self._buffer[:n]
- self._buffer = self._buffer[n:]
- return ret
-
- def readline(self):
- i = self._buffer.find("\n")
- if i != -1:
- return self.read(i+1)
- line = self.read(len(self._buffer)+1)
- while line and line[-1] != "\n":
- c = self.read(1)
- if not c:
- break
- line += c
- return line
-
-class BaseGateway(object):
- exc_info = sys.exc_info
-
- class _StopExecLoop(Exception):
- pass
-
- def __init__(self, io, _startcount=2):
- """ initialize core gateway, using the given inputoutput object.
- """
- self._io = io
- self._channelfactory = ChannelFactory(self, _startcount)
- self._receivelock = threading.RLock()
-
- def _initreceive(self):
- self._receiverthread = threading.Thread(name="receiver",
- target=self._thread_receiver)
- self._receiverthread.setDaemon(1)
- self._receiverthread.start()
-
- def _trace(self, msg):
- if debug:
- try:
- debug.write(unicode(msg) + "\n")
- debug.flush()
- except sysex:
- raise
- except:
- sys.stderr.write("exception during tracing\n")
-
- def _thread_receiver(self):
- """ thread to read and handle Messages half-sync-half-async. """
- self._trace("starting to receive")
- try:
- while 1:
- try:
- msg = Message.readfrom(self._io)
- self._trace("received <- %r" % msg)
- _receivelock = self._receivelock
- _receivelock.acquire()
- try:
- msg.received(self)
- finally:
- _receivelock.release()
- except sysex:
- break
- except EOFError:
- break
- except:
- self._trace(geterrortext(self.exc_info()))
- break
- finally:
- # XXX we need to signal fatal error states to
- # channels/callbacks, particularly ones
- # where the other side just died.
- self._stopexec()
- try:
- self._stopsend()
- except IOError:
- self._trace('IOError on _stopsend()')
- self._channelfactory._finished_receiving()
- if threading: # might be None during shutdown/finalization
- self._trace('leaving %r' % threading.currentThread())
-
- def _send(self, msg):
- if msg is None:
- self._io.close_write()
- else:
- try:
- msg.writeto(self._io)
- except:
- excinfo = self.exc_info()
- self._trace(geterrortext(excinfo))
- else:
- self._trace('sent -> %r' % msg)
-
- def _stopsend(self):
- self._send(None)
-
- def _stopexec(self):
- pass
-
- def _local_schedulexec(self, channel, sourcetask):
- channel.close("execution disallowed")
-
- # _____________________________________________________________________
- #
- # High Level Interface
- # _____________________________________________________________________
- #
- def newchannel(self):
- """ return new channel object. """
- return self._channelfactory.new()
-
- def join(self, joinexec=True):
- """ Wait for all IO (and by default all execution activity)
- to stop. the joinexec parameter is obsolete.
- """
- current = threading.currentThread()
- if self._receiverthread.isAlive():
- self._trace("joining receiver thread")
- self._receiverthread.join()
-
-class SlaveGateway(BaseGateway):
- def _stopexec(self):
- self._execqueue.put(None)
-
- def _local_schedulexec(self, channel, sourcetask):
- self._execqueue.put((channel, sourcetask))
-
- def serve(self, joining=True):
- self._execqueue = queue.Queue()
- self._initreceive()
- try:
- while 1:
- item = self._execqueue.get()
- if item is None:
- self._stopsend()
- break
- try:
- self.executetask(item)
- except self._StopExecLoop:
- break
- finally:
- self._trace("serve")
- if joining:
- self.join()
-
- def executetask(self, item):
- """ execute channel/source items. """
- channel, source = item
- try:
- loc = { 'channel' : channel, '__name__': '__channelexec__'}
- #open("task.py", 'w').write(source)
- self._trace("execution starts: %s" % repr(source)[:50])
- try:
- co = compile(source+'\n', '', 'exec')
- do_exec(co, loc)
- finally:
- self._trace("execution finished")
- except sysex:
- pass
- except self._StopExecLoop:
- channel.close()
- raise
- except:
- excinfo = self.exc_info()
- self._trace("got exception %s" % excinfo[1])
- errortext = geterrortext(excinfo)
- channel.close(errortext)
- else:
- channel.close()
-
--- a/example/execnet/redirect_remote_output.py
+++ b/example/execnet/redirect_remote_output.py
@@ -10,7 +10,7 @@ showcasing features of the channel objec
import py
-gw = py.execnet.PopenGateway()
+gw = execnet.PopenGateway()
outchan = gw.remote_exec("""
import sys
--- a/py/execnet/script/socketserverservice.py
+++ /dev/null
@@ -1,91 +0,0 @@
-"""
-A windows service wrapper for the py.execnet socketserver.
-
-To use, run:
- python socketserverservice.py register
- net start ExecNetSocketServer
-"""
-
-import sys
-import os
-import time
-import win32serviceutil
-import win32service
-import win32event
-import win32evtlogutil
-import servicemanager
-import threading
-import socketserver
-
-
-appname = 'ExecNetSocketServer'
-
-
-class SocketServerService(win32serviceutil.ServiceFramework):
- _svc_name_ = appname
- _svc_display_name_ = "%s" % appname
- _svc_deps_ = ["EventLog"]
- def __init__(self, args):
- # The exe-file has messages for the Event Log Viewer.
- # Register the exe-file as event source.
- #
- # Probably it would be better if this is done at installation time,
- # so that it also could be removed if the service is uninstalled.
- # Unfortunately it cannot be done in the 'if __name__ == "__main__"'
- # block below, because the 'frozen' exe-file does not run this code.
- #
- win32evtlogutil.AddSourceToRegistry(self._svc_display_name_,
- servicemanager.__file__,
- "Application")
- win32serviceutil.ServiceFramework.__init__(self, args)
- self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)
- self.WAIT_TIME = 1000 # in milliseconds
-
-
- def SvcStop(self):
- self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
- win32event.SetEvent(self.hWaitStop)
-
-
- def SvcDoRun(self):
- # Redirect stdout and stderr to prevent "IOError: [Errno 9]
- # Bad file descriptor". Windows services don't have functional
- # output streams.
- sys.stdout = sys.stderr = open('nul', 'w')
-
- # Write a 'started' event to the event log...
- win32evtlogutil.ReportEvent(self._svc_display_name_,
- servicemanager.PYS_SERVICE_STARTED,
- 0, # category
- servicemanager.EVENTLOG_INFORMATION_TYPE,
- (self._svc_name_, ''))
- print("Begin: %s" % (self._svc_display_name_))
-
- hostport = ':8888'
- print('Starting py.execnet SocketServer on %s' % hostport)
- serversock = socketserver.bind_and_listen(hostport)
- thread = threading.Thread(target=socketserver.startserver,
- args=(serversock,),
- kwargs={'loop':True})
- thread.setDaemon(True)
- thread.start()
-
- # wait to be stopped or self.WAIT_TIME to pass
- while True:
- result = win32event.WaitForSingleObject(self.hWaitStop,
- self.WAIT_TIME)
- if result == win32event.WAIT_OBJECT_0:
- break
-
- # write a 'stopped' event to the event log.
- win32evtlogutil.ReportEvent(self._svc_display_name_,
- servicemanager.PYS_SERVICE_STOPPED,
- 0, # category
- servicemanager.EVENTLOG_INFORMATION_TYPE,
- (self._svc_name_, ''))
- print("End: %s" % appname)
-
-
-if __name__ == '__main__':
- # Note that this code will not be run in the 'frozen' exe-file!!!
- win32serviceutil.HandleCommandLine(SocketServerService)
--- a/py/execnet/__init__.py
+++ /dev/null
@@ -1,1 +0,0 @@
-""" ad-hoc networking mechanism """
--- a/py/execnet/script/quitserver.py
+++ /dev/null
@@ -1,16 +0,0 @@
-"""
-
- send a "quit" signal to a remote server
-
-"""
-
-import sys
-import socket
-
-hostport = sys.argv[1]
-host, port = hostport.split(':')
-hostport = (host, int(port))
-
-sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-sock.connect(hostport)
-sock.sendall('"raise KeyboardInterrupt"\n')
--- a/example/execnet/sysinfo.py
+++ b/example/execnet/sysinfo.py
@@ -95,7 +95,7 @@ def error(*args):
def getinfo(sshname, ssh_config=None, loginfo=sys.stdout):
debug("connecting to", sshname)
try:
- gw = py.execnet.SshGateway(sshname, ssh_config=ssh_config)
+ gw = execnet.SshGateway(sshname, ssh_config=ssh_config)
except IOError:
error("could not get sshagteway", sshname)
else:
--- a/py/test/plugin/pytest_execnetcleanup.py
+++ /dev/null
@@ -1,41 +0,0 @@
-"""
-cleanup execnet gateways during test function runs.
-"""
-import py
-
-pytest_plugins = "xfail"
-
-def pytest_configure(config):
- config.pluginmanager.register(Execnetcleanup())
-
-class Execnetcleanup:
- _gateways = None
- def __init__(self, debug=False):
- self._debug = debug
-
- def pyexecnet_gateway_init(self, gateway):
- if self._gateways is not None:
- self._gateways.append(gateway)
-
- def pyexecnet_gateway_exit(self, gateway):
- if self._gateways is not None:
- self._gateways.remove(gateway)
-
- def pytest_sessionstart(self, session):
- self._gateways = []
-
- def pytest_sessionfinish(self, session, exitstatus):
- l = []
- for gw in self._gateways:
- gw.exit()
- l.append(gw)
- #for gw in l:
- # gw.join()
-
- def pytest_pyfunc_call(self, __multicall__, pyfuncitem):
- if self._gateways is not None:
- gateways = self._gateways[:]
- res = __multicall__.execute()
- while len(self._gateways) > len(gateways):
- self._gateways[-1].exit()
- return res
--- a/py/execnet/rsync_remote.py
+++ /dev/null
@@ -1,92 +0,0 @@
-def f():
- import os, stat, shutil
- try:
- from hashlib import md5
- except ImportError:
- from md5 import md5
- destdir, options = channel.receive()
- modifiedfiles = []
-
- def remove(path):
- assert path.startswith(destdir)
- try:
- os.unlink(path)
- except OSError:
- # assume it's a dir
- shutil.rmtree(path)
-
- def receive_directory_structure(path, relcomponents):
- try:
- st = os.lstat(path)
- except OSError:
- st = None
- msg = channel.receive()
- if isinstance(msg, list):
- if st and not stat.S_ISDIR(st.st_mode):
- os.unlink(path)
- st = None
- if not st:
- os.makedirs(path)
- entrynames = {}
- for entryname in msg:
- receive_directory_structure(os.path.join(path, entryname),
- relcomponents + [entryname])
- entrynames[entryname] = True
- if options.get('delete'):
- for othername in os.listdir(path):
- if othername not in entrynames:
- otherpath = os.path.join(path, othername)
- remove(otherpath)
- elif msg is not None:
- checksum = None
- if st:
- if stat.S_ISREG(st.st_mode):
- msg_mtime, msg_size = msg
- if msg_size != st.st_size:
- pass
- elif msg_mtime != st.st_mtime:
- f = open(path, 'rb')
- checksum = md5(f.read()).digest()
- f.close()
- else:
- return # already fine
- else:
- remove(path)
- channel.send(("send", (relcomponents, checksum)))
- modifiedfiles.append((path, msg))
- receive_directory_structure(destdir, [])
-
- STRICT_CHECK = False # seems most useful this way for py.test
- channel.send(("list_done", None))
-
- for path, (time, size) in modifiedfiles:
- data = channel.receive()
- channel.send(("ack", path[len(destdir) + 1:]))
- if data is not None:
- if STRICT_CHECK and len(data) != size:
- raise IOError('file modified during rsync: %r' % (path,))
- f = open(path, 'wb')
- f.write(data)
- f.close()
- try:
- os.utime(path, (time, time))
- except OSError:
- pass
- del data
- channel.send(("links", None))
-
- msg = channel.receive()
- while msg is not 42:
- # we get symlink
- _type, relpath, linkpoint = msg
- assert _type == "link"
- path = os.path.join(destdir, relpath)
- try:
- remove(path)
- except OSError:
- pass
-
- os.symlink(os.path.join(destdir, linkpoint), path)
- msg = channel.receive()
- channel.send(("done", None))
-
--- a/example/funcarg/mysetup2/conftest.py
+++ b/example/funcarg/mysetup2/conftest.py
@@ -20,5 +20,5 @@ class MySetup:
host = self.config.option.ssh
if host is None:
py.test.skip("specify ssh host with --ssh")
- return py.execnet.SshGateway(host)
+ return execnet.SshGateway(host)
--- a/contrib/sysinfo.py
+++ b/contrib/sysinfo.py
@@ -95,7 +95,7 @@ def error(*args):
def getinfo(sshname, ssh_config=None, loginfo=sys.stdout):
debug("connecting to", sshname)
try:
- gw = py.execnet.SshGateway(sshname, ssh_config=ssh_config)
+ gw = execnet.SshGateway(sshname, ssh_config=ssh_config)
except IOError:
error("could not get sshagteway", sshname)
else:
--- a/doc/execnet.txt
+++ b/doc/execnet.txt
@@ -2,263 +2,11 @@
py.execnet: *elastic* distributed programming
==============================================================================
-``execnet`` helps you to:
+Since pylib 1.1 "py.execnet" is separated out of hte lib and now
+available through the standalone `execnet standalone package`_.
-* ad-hoc instantiate local or remote Python Processes
-* send code for execution in one or many processes
-* send and receive data between processes through channels
+If you have usages of the "py.execnet.*" 1.0 API you can likely
+rename all occurences of the string ``py.execnet.`` with the
+string ``execnet.``.
-One of it's unique features is that it uses a **zero-install**
-technique: no manual installation steps are required on
-remote places, only a basic working Python interpreter
-and some input/output connection to it.
-
-There is a `EuroPython2009 talk`_ from July 2009 with
-examples and some pictures.
-
-.. contents::
- :local:
- :depth: 2
-
-.. _`EuroPython2009 talk`: http://codespeak.net/download/py/ep2009-execnet.pdf
-
-Gateways: immediately spawn local or remote process
-===================================================
-
-In order to send code to a remote place or a subprocess
-you need to instantiate a so-called Gateway object.
-There are currently three Gateway classes:
-
-* :api:`py.execnet.PopenGateway` to open a subprocess
- on the local machine. Useful for making use
- of multiple processors to to contain code execution
- in a separated environment.
-
-* :api:`py.execnet.SshGateway` to connect to
- a remote ssh server and distribute execution to it.
-
-* :api:`py.execnet.SocketGateway` a way to connect to
- a remote Socket based server. *Note* that this method
- requires a manually started
- :source:py/execnet/script/socketserver.py
- script. You can run this "server script" without
- having the py lib installed on the remote system
- and you can setup it up as permanent service.
-
-
-remote_exec: execute source code remotely
-===================================================
-
-All gateways offer remote code execution via this high level function::
-
- def remote_exec(source):
- """return channel object for communicating with the asynchronously
- executing 'source' code which will have a corresponding 'channel'
- object in its executing namespace."""
-
-With `remote_exec` you send source code to the other
-side and get both a local and a remote Channel_ object,
-which you can use to have the local and remote site
-communicate data in a structured way. Here is
-an example for reading the PID::
-
- >>> import py
- >>> gw = py.execnet.PopenGateway()
- >>> channel = gw.remote_exec("""
- ... import os
- ... channel.send(os.getpid())
- ... """)
- >>> remote_pid = channel.receive()
- >>> remote_pid != py.std.os.getpid()
- True
-
-.. _`Channel`:
-.. _`channel-api`:
-.. _`exchange data`:
-
-Channels: bidirectionally exchange data between hosts
-=======================================================
-
-A channel object allows to send and receive data between
-two asynchronously running programs. When calling
-`remote_exec` you will get a channel object back and
-the code fragment running on the other side will
-see a channel object in its global namespace.
-
-Here is the interface of channel objects::
-
- #
- # API for sending and receiving anonymous values
- #
- channel.send(item):
- sends the given item to the other side of the channel,
- possibly blocking if the sender queue is full.
- Note that items need to be marshallable (all basic
- python types are).
-
- channel.receive():
- receives an item that was sent from the other side,
- possibly blocking if there is none.
- Note that exceptions from the other side will be
- reraised as gateway.RemoteError exceptions containing
- a textual representation of the remote traceback.
-
- channel.waitclose(timeout=None):
- wait until this channel is closed. Note that a closed
- channel may still hold items that will be received or
- send. Note that exceptions from the other side will be
- reraised as gateway.RemoteError exceptions containing
- a textual representation of the remote traceback.
-
- channel.close():
- close this channel on both the local and the remote side.
- A remote side blocking on receive() on this channel
- will get woken up and see an EOFError exception.
-
-
-.. _xspec:
-
-
-XSpec: string specification for gateway type and configuration
-===============================================================
-
-``py.execnet`` supports a simple extensible format for
-specifying and configuring Gateways for remote execution.
-You can use a string specification to instantiate a new gateway,
-for example a new SshGateway::
-
- gateway = py.execnet.makegateway("ssh=myhost")
-
-Let's look at some examples for valid specifications.
-Specification for an ssh connection to `wyvern`, running on python2.4 in the (newly created) 'mycache' subdirectory::
-
- ssh=wyvern//python=python2.4//chdir=mycache
-
-Specification of a python2.5 subprocess; with a low CPU priority ("nice" level). Current dir will be the current dir of the instantiator (that's true for all 'popen' specifications unless they specify 'chdir')::
-
- popen//python=2.5//nice=20
-
-Specification of a Python Socket server process that listens on 192.168.1.4:8888; current dir will be the 'pyexecnet-cache' sub directory which is used a default for all remote processes::
-
- socket=192.168.1.4:8888
-
-More generally, a specification string has this general format::
-
- key1=value1//key2=value2//key3=value3
-
-If you omit a value, a boolean true value is assumed. Currently
-the following key/values are supported:
-
-* ``popen`` for a PopenGateway
-* ``ssh=host`` for a SshGateway
-* ``socket=address:port`` for a SocketGateway
-* ``python=executable`` for specifying Python executables
-* ``chdir=path`` change remote working dir to given relative or absolute path
-* ``nice=value`` decrease remote nice level if platforms supports it
-
-
-Examples of py.execnet usage
-===============================================================
-
-Compare cwd() of Popen Gateways
-----------------------------------------
-
-A PopenGateway has the same working directory as the instantiatior::
-
- >>> import py, os
- >>> gw = py.execnet.PopenGateway()
- >>> ch = gw.remote_exec("import os; channel.send(os.getcwd())")
- >>> res = ch.receive()
- >>> assert res == os.getcwd()
- >>> gw.exit()
-
-Synchronously receive results from two sub processes
------------------------------------------------------
-
-Use MultiChannels for receiving multiple results from remote code::
-
- >>> import py
- >>> ch1 = py.execnet.PopenGateway().remote_exec("channel.send(1)")
- >>> ch2 = py.execnet.PopenGateway().remote_exec("channel.send(2)")
- >>> mch = py.execnet.MultiChannel([ch1, ch2])
- >>> l = mch.receive_each()
- >>> assert len(l) == 2
- >>> assert 1 in l
- >>> assert 2 in l
-
-Asynchronously receive results from two sub processes
------------------------------------------------------
-
-Use ``MultiChannel.make_receive_queue()`` for asynchronously receiving
-multiple results from remote code. This standard Queue provides
-``(channel, result)`` tuples which allows to determine where
-a result comes from::
-
- >>> import py
- >>> ch1 = py.execnet.PopenGateway().remote_exec("channel.send(1)")
- >>> ch2 = py.execnet.PopenGateway().remote_exec("channel.send(2)")
- >>> mch = py.execnet.MultiChannel([ch1, ch2])
- >>> queue = mch.make_receive_queue()
- >>> chan1, res1 = queue.get() # you may also specify a timeout
- >>> chan2, res2 = queue.get()
- >>> res1 + res2
- 3
- >>> assert chan1 in (ch1, ch2)
- >>> assert chan2 in (ch1, ch2)
- >>> assert chan1 != chan2
-
-Receive file contents from remote SSH account
------------------------------------------------------
-
-Here is a small program that you can use to retrieve
-contents of remote files::
-
- import py
- # open a gateway to a fresh child process
- gw = py.execnet.SshGateway('codespeak.net')
- channel = gw.remote_exec("""
- for fn in channel:
- f = open(fn, 'rb')
- channel.send(f.read())
- f.close()
- """)
-
- for fn in somefilelist:
- channel.send(fn)
- content = channel.receive()
- # process content
-
- # later you can exit / close down the gateway
- gw.exit()
-
-
-Instantiate a socket server in a new subprocess
------------------------------------------------------
-
-The following example opens a PopenGateway, i.e. a python
-child process, and starts a socket server within that process
-and then opens a second gateway to the freshly started
-socketserver::
-
- import py
-
- popengw = py.execnet.PopenGateway()
- socketgw = py.execnet.SocketGateway.new_remote(popengw, ("127.0.0.1", 0))
-
- print socketgw._rinfo() # print some info about the remote environment
-
-
-Sending a module / checking if run through remote_exec
---------------------------------------------------------------
-
-You can pass a module object to ``remote_exec`` in which case
-its source code will be sent. No dependencies will be transferred
-so the module must be self-contained or only use modules that are
-installed on the "other" side. Module code can detect if it is
-running in a remote_exec situation by checking for the special
-``__name__`` attribute like this::
-
- if __name__ == '__channelexec__':
- # ... call module functions ...
-
-
+.. _`execnet standalone package`: http://codespeak.net/execnet
More information about the pytest-commit
mailing list