[py-svn] r63089 - in py/trunk/py: . execnet

hpk at codespeak.net hpk at codespeak.net
Thu Mar 19 18:05:42 CET 2009


Author: hpk
Date: Thu Mar 19 18:05:41 2009
New Revision: 63089

Added:
   py/trunk/py/execnet/multi.py
      - copied, changed from r63084, py/trunk/py/execnet/gwmanage.py
Modified:
   py/trunk/py/__init__.py
   py/trunk/py/execnet/gwmanage.py
Log:
factor out MultiChannel and MultiGateway helpers


Modified: py/trunk/py/__init__.py
==============================================================================
--- py/trunk/py/__init__.py	(original)
+++ py/trunk/py/__init__.py	Thu Mar 19 18:05:41 2009
@@ -152,8 +152,8 @@
     'execnet.PopenGateway'   : ('./execnet/register.py', 'PopenGateway'),
     'execnet.SshGateway'     : ('./execnet/register.py', 'SshGateway'),
     'execnet.GatewaySpec'    : ('./execnet/gwmanage.py', 'GatewaySpec'),
-    'execnet.MultiGateway'   : ('./execnet/gwmanage.py', 'MultiGateway'),
-    'execnet.MultiChannel'   : ('./execnet/gwmanage.py', 'MultiChannel'),
+    'execnet.MultiGateway'   : ('./execnet/multi.py', 'MultiGateway'),
+    'execnet.MultiChannel'   : ('./execnet/multi.py', 'MultiChannel'),
 
     # execnet scripts
     'execnet.RSync'          : ('./execnet/rsync.py', 'RSync'),

Modified: py/trunk/py/execnet/gwmanage.py
==============================================================================
--- py/trunk/py/execnet/gwmanage.py	(original)
+++ py/trunk/py/execnet/gwmanage.py	Thu Mar 19 18:05:41 2009
@@ -94,64 +94,6 @@
         gw.spec = self
         return gw 
 
-class MultiChannel:
-    def __init__(self, channels):
-        self._channels = channels
-
-    def send_each(self, item):
-        for ch in self._channels:
-            ch.send(item)
-
-    def receive_each(self, withchannel=False):
-        assert not hasattr(self, '_queue')
-        l = []
-        for ch in self._channels:
-            obj = ch.receive()
-            if withchannel:
-                l.append((ch, obj))
-            else:
-                l.append(obj)
-        return l 
-
-    def make_receive_queue(self, endmarker=NO_ENDMARKER_WANTED):
-        try:
-            return self._queue
-        except AttributeError:
-            self._queue = py.std.Queue.Queue()
-            for ch in self._channels:
-                def putreceived(obj, channel=ch):
-                    self._queue.put((channel, obj))
-                if endmarker is NO_ENDMARKER_WANTED:
-                    ch.setcallback(putreceived)
-                else:
-                    ch.setcallback(putreceived, endmarker=endmarker)
-            return self._queue
-
-
-    def waitclose(self):
-        first = None
-        for ch in self._channels:
-            try:
-                ch.waitclose()
-            except ch.RemoteError:
-                if first is None:
-                    first = py.std.sys.exc_info()
-        if first:
-            raise first[0], first[1], first[2]
-
-class MultiGateway:
-    RemoteError = RemoteError
-    def __init__(self, gateways):
-        self.gateways = gateways
-    def remote_exec(self, source):
-        channels = []
-        for gw in self.gateways:
-            channels.append(gw.remote_exec(source))
-        return MultiChannel(channels)
-    def exit(self):
-        for gw in self.gateways:
-            gw.exit()
-
 class GatewayManager:
     RemoteError = RemoteError
 
@@ -179,7 +121,7 @@
             else:
                 if remote:
                     l.append(gw)
-        return MultiGateway(gateways=l)
+        return py.execnet.MultiGateway(gateways=l)
 
     def multi_exec(self, source, inplacelocal=True):
         """ remote execute code on all gateways. 

Copied: py/trunk/py/execnet/multi.py (from r63084, py/trunk/py/execnet/gwmanage.py)
==============================================================================
--- py/trunk/py/execnet/gwmanage.py	(original)
+++ py/trunk/py/execnet/multi.py	Thu Mar 19 18:05:41 2009
@@ -1,98 +1,24 @@
 """
-    instantiating, managing and rsyncing to hosts
-
-Host specification strings and implied gateways:
-
-    socket:hostname:port:path SocketGateway
-    popen[-executable][:path]          PopenGateway
-    [ssh:]spec:path           SshGateway
-    * [SshGateway]
-
-on hostspec.makeconnection() a Host object
-will be created which has an instantiated gateway. 
-the remote side will be chdir()ed to the specified path. 
-if no path was specified, do no chdir() at all. 
-
+    Working with multiple channels and gateways
 
 """
 import py
-import sys, os
-from py.__.test.dsession.masterslave import MasterNode
-from py.__.test import event
 from py.__.execnet.channel import RemoteError
 
 NO_ENDMARKER_WANTED = object()
 
-class GatewaySpec(object):
-    python = None
-    def __init__(self, spec, defaultjoinpath="pyexecnetcache"):
-        if spec == "popen" or spec.startswith("popen:"):
-            parts = spec.split(":", 2)
-            self.type = self.address = parts.pop(0)
-            if parts:
-                python = parts.pop(0)
-                # XXX XXX XXX do better GWSPEC that can deal
-                # with "C:" 
-                if py.std.sys.platform == "win32" and len(python) == 1:
-                    python = "%s:%s" %(python, parts.pop(0))
-                self.python = python
-            if parts:
-                self.joinpath = parts.pop(0)
-            else:
-                self.joinpath = ""
-            if not self.python:
-                self.python = py.std.sys.executable
-
-        elif spec.startswith("socket:"):
-            parts = spec[7:].split(":", 2)
-            self.address = parts.pop(0)
-            if parts:
-                port = int(parts.pop(0))
-                self.address = self.address, port
-            self.joinpath = parts and parts.pop(0) or ""
-            self.type = "socket"
-        else:
-            if spec.startswith("ssh:"):
-                spec = spec[4:]
-            parts = spec.split(":", 2)
-            self.address = parts.pop(0)
-            self.python = parts and parts.pop(0) or "python"
-            self.joinpath = parts and parts.pop(0) or ""
-            self.type = "ssh"
-        if not self.joinpath and not self.inplacelocal():
-            self.joinpath = defaultjoinpath
-
-    def inplacelocal(self):
-        return bool(self.type == "popen" and not self.joinpath)
-
-    def __str__(self):
-        return "<GatewaySpec %s:%s>" % (self.address, self.joinpath)
-    __repr__ = __str__
-
-    def makegateway(self, waitclose=True):
-        if self.type == "popen":
-            gw = py.execnet.PopenGateway(python=self.python)
-        elif self.type == "socket":
-            gw = py.execnet.SocketGateway(*self.address)
-        elif self.type == "ssh":
-            gw = py.execnet.SshGateway(self.address, remotepython=self.python)
-        if self.joinpath:
-            channel = gw.remote_exec("""
-                import os 
-                path = %r
-                try:
-                    os.chdir(path)
-                except OSError:
-                    os.mkdir(path)
-                    os.chdir(path)
-            """ % self.joinpath)
-            if waitclose:
-                channel.waitclose()
-        else:
-            if waitclose:
-                gw.remote_exec("").waitclose()
-        gw.spec = self
-        return gw 
+class MultiGateway:
+    RemoteError = RemoteError
+    def __init__(self, gateways):
+        self.gateways = gateways
+    def remote_exec(self, source):
+        channels = []
+        for gw in self.gateways:
+            channels.append(gw.remote_exec(source))
+        return MultiChannel(channels)
+    def exit(self):
+        for gw in self.gateways:
+            gw.exit()
 
 class MultiChannel:
     def __init__(self, channels):
@@ -139,119 +65,4 @@
         if first:
             raise first[0], first[1], first[2]
 
-class MultiGateway:
-    RemoteError = RemoteError
-    def __init__(self, gateways):
-        self.gateways = gateways
-    def remote_exec(self, source):
-        channels = []
-        for gw in self.gateways:
-            channels.append(gw.remote_exec(source))
-        return MultiChannel(channels)
-    def exit(self):
-        for gw in self.gateways:
-            gw.exit()
-
-class GatewayManager:
-    RemoteError = RemoteError
-
-    def __init__(self, specs):
-        self.specs = [GatewaySpec(spec) for spec in specs]
-        self.gateways = []
-
-    def trace(self, msg):
-        py._com.pyplugins.notify("trace", "gatewaymanage", msg)
-
-    def makegateways(self):
-        assert not self.gateways
-        for spec in self.specs:
-            self.trace("makegateway %s" %(spec))
-            self.gateways.append(spec.makegateway())
-
-    def getgateways(self, remote=True, inplacelocal=True):
-        if not self.gateways and self.specs:
-            self.makegateways()
-        l = []
-        for gw in self.gateways:
-            if gw.spec.inplacelocal():
-                if inplacelocal:
-                    l.append(gw)
-            else:
-                if remote:
-                    l.append(gw)
-        return MultiGateway(gateways=l)
-
-    def multi_exec(self, source, inplacelocal=True):
-        """ remote execute code on all gateways. 
-            @param inplacelocal=False: don't send code to inplacelocal hosts. 
-        """
-        multigw = self.getgateways(inplacelocal=inplacelocal)
-        return multigw.remote_exec(source)
-
-    def multi_chdir(self, basename, inplacelocal=True):
-        """ perform a remote chdir to the given path, may be relative. 
-            @param inplacelocal=False: don't send code to inplacelocal hosts. 
-        """ 
-        self.multi_exec("import os ; os.chdir(%r)" % basename, 
-                        inplacelocal=inplacelocal).waitclose()
-
-    def rsync(self, source, notify=None, verbose=False, ignores=None):
-        """ perform rsync to all remote hosts. 
-        """ 
-        rsync = HostRSync(source, verbose=verbose, ignores=ignores)
-        added = False
-        for gateway in self.gateways:
-            spec = gateway.spec
-            if not spec.inplacelocal():
-                self.trace("add_target_host %r" %(gateway,))
-                def finished():
-                    if notify:
-                        notify("rsyncrootready", spec, source)
-                rsync.add_target_host(gateway, finished=finished)
-                added = True
-        if added:
-            self.trace("rsyncing %r" % source)
-            rsync.send()
-            self.trace("rsyncing %r finished" % source)
-        else:
-            self.trace("rsync: nothing to do.")
-
-    def exit(self):
-        while self.gateways:
-            gw = self.gateways.pop()
-            self.trace("exiting gateway %s" % gw)
-            gw.exit()
 
-class HostRSync(py.execnet.RSync):
-    """ RSyncer that filters out common files 
-    """
-    def __init__(self, sourcedir, *args, **kwargs):
-        self._synced = {}
-        ignores= None
-        if 'ignores' in kwargs:
-            ignores = kwargs.pop('ignores')
-        self._ignores = ignores or []
-        super(HostRSync, self).__init__(sourcedir=sourcedir, **kwargs)
-
-    def filter(self, path):
-        path = py.path.local(path)
-        if not path.ext in ('.pyc', '.pyo'):
-            if not path.basename.endswith('~'): 
-                if path.check(dotfile=0):
-                    for x in self._ignores:
-                        if path == x:
-                            break
-                    else:
-                        return True
-
-    def add_target_host(self, gateway, finished=None):
-        remotepath = os.path.basename(self._sourcedir)
-        super(HostRSync, self).add_target(gateway, remotepath, 
-                                          finishedcallback=finished,
-                                          delete=True,)
-
-    def _report_send_file(self, gateway, modified_rel_path):
-        if self._verbose:
-            path = os.path.basename(self._sourcedir) + "/" + modified_rel_path
-            remotepath = gateway.spec.joinpath
-            print '%s:%s <= %s' % (gateway.remoteaddress, remotepath, path)



More information about the pytest-commit mailing list