[py-svn] r37822 - in py/trunk/py/execnet: . testing

hpk at codespeak.net hpk at codespeak.net
Fri Feb 2 20:57:49 CET 2007


Author: hpk
Date: Fri Feb  2 20:57:47 2007
New Revision: 37822

Modified:
   py/trunk/py/execnet/gateway.py
   py/trunk/py/execnet/register.py
   py/trunk/py/execnet/testing/test_gateway.py
Log:
added lots of docstrings, general cleanup


Modified: py/trunk/py/execnet/gateway.py
==============================================================================
--- py/trunk/py/execnet/gateway.py	(original)
+++ py/trunk/py/execnet/gateway.py	Fri Feb  2 20:57:47 2007
@@ -33,23 +33,25 @@
 class Gateway(object):
     _ThreadOut = ThreadOut 
     remoteaddress = ""
-
-    def __init__(self, io, startcount=2, maxthreads=None):
+    def __init__(self, io, execthreads=None, _startcount=2): 
+        """ initialize core gateway, using the given 
+            inputoutput object and 'execthreads' execution
+            threads. 
+        """
         global registered_cleanup
-        self._execpool = WorkerPool(maxthreads=maxthreads) 
-##        self.running = True 
+        self._execpool = WorkerPool(maxthreads=execthreads)
         self._io = io
         self._outgoing = Queue.Queue()
-        self._channelfactory = ChannelFactory(self, startcount)
-##        self._exitlock = threading.Lock()
+        self._channelfactory = ChannelFactory(self, _startcount)
         if not registered_cleanup:
             atexit.register(cleanup_atexit)
             registered_cleanup = True
         _active_sendqueues[self._outgoing] = True
         self._pool = NamedThreadPool(receiver = self._thread_receiver, 
-                                    sender = self._thread_sender)
+                                     sender = self._thread_sender)
 
     def __repr__(self):
+        """ return string representing gateway type and status. """
         addr = self.remoteaddress 
         if addr:
             addr = '[%s]' % (addr,)
@@ -199,13 +201,12 @@
         return self._channelfactory.new()
 
     def remote_exec(self, source, stdout=None, stderr=None): 
-        """ return channel object for communicating with the asynchronously
-            executing 'source' code which will have a corresponding 'channel'
-            object in its executing namespace. 
-
-            You may provide callback functions 'stdout' and 'stderr' 
-            which will get called with the remote stdout/stderr output
-            piece by piece.
+        """ return channel object and connect it to a remote
+            execution thread where the given 'source' executes
+            and has the sister 'channel' object in its global 
+            namespace.  The callback functions 'stdout' and 
+            'stderr' get called on receival of remote 
+            stdout/stderr output strings. 
         """
         try:
             source = str(Source(source))
@@ -252,29 +253,8 @@
                         c.waitclose(1.0) 
         return Handle()
 
-##    def exit(self):
-##        """ initiate full gateway teardown.   
-##            Note that the  teardown of sender/receiver threads happens 
-##            asynchronously and timeouts on stopping worker execution 
-##            threads are ignored.  You can issue join() or join(joinexec=False) 
-##            if you want to wait for a full teardown (possibly excluding 
-##            execution threads). 
-##        """ 
-##        # note that threads may still be scheduled to start
-##        # during our execution! 
-##        self._exitlock.acquire()
-##        try:
-##            if self.running: 
-##                self.running = False 
-##                if not self._pool.getstarted('sender'): 
-##                    raise IOError("sender thread not alive anymore!") 
-##                self._outgoing.put(None)
-##                self._trace("exit procedure triggered, pid %d " % (os.getpid(),))
-##                _gateways.remove(self) 
-##        finally:
-##            self._exitlock.release()
-
     def exit(self):
+        """ Try to stop all IO activity. """
         try:
             del _active_sendqueues[self._outgoing]
         except KeyError:
@@ -283,6 +263,9 @@
             self._outgoing.put(None)
 
     def join(self, joinexec=True):
+        """ Wait for all IO (and by default all execution activity) 
+            to stop. 
+        """
         current = threading.currentThread()
         for x in self._pool.getstarted(): 
             if x != current: 

Modified: py/trunk/py/execnet/register.py
==============================================================================
--- py/trunk/py/execnet/register.py	(original)
+++ py/trunk/py/execnet/register.py	Fri Feb  2 20:57:47 2007
@@ -7,9 +7,7 @@
 
 # the list of modules that must be send to the other side 
 # for bootstrapping gateways
-# XXX we want to have a cleaner bootstrap mechanism 
-#     by making sure early that we have the py lib available
-#     in a sufficient version 
+# XXX we'd like to have a leaner and meaner bootstrap mechanism 
 
 startup_modules = [
     'py.__.thread.io', 
@@ -30,7 +28,7 @@
     """ initialize gateways on both sides of a inputoutput object. """
     def __init__(self, io):
         self._remote_bootstrap_gateway(io)
-        super(InstallableGateway, self).__init__(io=io, startcount=1)
+        super(InstallableGateway, self).__init__(io=io, _startcount=1) 
 
     def _remote_bootstrap_gateway(self, io, extra=''):
         """ return Gateway with a asynchronously remotely
@@ -42,7 +40,9 @@
         """
         bootstrap = [extra]
         bootstrap += [getsource(x) for x in startup_modules]
-        bootstrap += [io.server_stmt, "Gateway(io=io, startcount=2).join(joinexec=False)",]
+        bootstrap += [io.server_stmt, 
+                      "Gateway(io=io, _startcount=2).join(joinexec=False)",
+                     ]
         source = "\n".join(bootstrap)
         self._trace("sending gateway bootstrap code")
         io.write('%r\n' % source)
@@ -52,45 +52,20 @@
         infile, outfile = os.popen2(cmd)
         io = inputoutput.Popen2IO(infile, outfile)
         super(PopenCmdGateway, self).__init__(io=io)
-##        self._pidchannel = self.remote_exec("""
-##            import os
-##            channel.send(os.getpid())
-##        """)
-
-##    def exit(self):
-##        try:
-##            self._pidchannel.waitclose(timeout=0.5)
-##            pid = self._pidchannel.receive()
-##        except IOError:
-##            self._trace("IOError: could not receive child PID:")
-##            self._traceex(sys.exc_info())
-##            pid = None
-##        super(PopenCmdGateway, self).exit()
-##        if pid is not None: 
-##            self._trace("waiting for pid %s" % pid)
-##            try:
-##                os.waitpid(pid, 0)
-##            except KeyboardInterrupt: 
-##                if sys.platform != "win32": 
-##                    os.kill(pid, 15) 
-##                raise
-##            except OSError, e:
-##                self._trace("child process %s already dead? error:%s" %
-##                           (pid, str(e)))
 
 class PopenGateway(PopenCmdGateway):
-    # use sysfind/sysexec/subprocess instead of os.popen?
+    """ This Gateway provides interaction with a newly started
+        python subprocess. 
+    """
     def __init__(self, python=sys.executable):
+        """ instantiate a gateway to a subprocess 
+            started with the given 'python' executable. 
+        """
         cmd = '%s -u -c "exec input()"' % python
         super(PopenGateway, self).__init__(cmd)
 
     def _remote_bootstrap_gateway(self, io, extra=''):
-        # XXX the following hack helps us to import the same version
-        #     of the py lib and other dependcies, but only works for 
-        #     PopenGateways because we can assume to have access to 
-        #     the same filesystem 
-        #     --> we definitely need proper remote imports working
-        #         across any kind of gateway!
+        # have the subprocess use the same PYTHONPATH and py lib 
         x = py.path.local(py.__file__).dirpath().dirpath()
         ppath = os.environ.get('PYTHONPATH', '')
         plist = [str(x)] + ppath.split(':')
@@ -98,48 +73,66 @@
             "import sys ; sys.path[:0] = %r" % (plist,), 
             "import os ; os.environ['PYTHONPATH'] = %r" % ppath, 
             str(py.code.Source(stdouterrin_setnull)), 
-            "stdouterrin_setnull()",
+            "stdouterrin_setnull()", 
             ""
             ])
         super(PopenGateway, self)._remote_bootstrap_gateway(io, s)
 
 class SocketGateway(InstallableGateway):
+    """ This Gateway provides interaction with a remote process
+        by connecting to a specified socket.  On the remote
+        side you need to manually start a small script 
+        (py/execnet/script/socketserver.py) that accepts
+        SocketGateway connections. 
+    """
     def __init__(self, host, port):
-        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        """ instantiate a gateway to a processed accessed
+            via a host/port specified socket. 
+        """
         self.host = host = str(host)
         self.port = port = int(port)
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         sock.connect((host, port))
         io = inputoutput.SocketIO(sock)
         super(SocketGateway, self).__init__(io=io)
         self.remoteaddress = '%s:%d' % (self.host, self.port)
 
-    def remote_install(cls, gateway, hostport=None): 
-        """ return a connected socket gateway through the
-            given gateway. 
+    def new_remote(cls, gateway, hostport=None): 
+        """ return a new (connected) socket gateway, instatiated
+            indirectly through the given 'gateway'. 
         """ 
         if hostport is None: 
             host, port = ('', 0)  # XXX works on all platforms? 
         else:   
             host, port = hostport 
         socketserverbootstrap = py.code.Source(
-            mypath.dirpath('script', 'socketserver.py').read('rU'),
-            """
+            mypath.dirpath('script', 'socketserver.py').read('rU'), """
             import socket
             sock = bind_and_listen((%r, %r)) 
             port = sock.getsockname()
             channel.send(port) 
             startserver(sock)
-        """ % (host, port)) 
+            """ % (host, port)
+        ) 
         # execute the above socketserverbootstrap on the other side
         channel = gateway.remote_exec(socketserverbootstrap)
         (realhost, realport) = channel.receive()
-        #gateway._trace("remote_install received" 
-        #              "port=%r, hostname = %r" %(realport, hostname))
+        #gateway._trace("new_remote received" 
+        #               "port=%r, hostname = %r" %(realport, hostname))
         return py.execnet.SocketGateway(host, realport) 
-    remote_install = classmethod(remote_install)
+    new_remote = classmethod(new_remote)
+
     
 class SshGateway(PopenCmdGateway):
+    """ This Gateway provides interaction with a remote process,
+        established via the 'ssh' command line binary.  
+        The remote side needs to have a Python interpreter executable. 
+    """
     def __init__(self, sshaddress, remotepython='python', identity=None): 
+        """ instantiate a remote ssh process with the 
+            given 'sshaddress' and remotepython version.
+            you may specify an 'identity' filepath. 
+        """
         self.remoteaddress = sshaddress
         remotecmd = '%s -u -c "exec input()"' % (remotepython,)
         cmdline = [sshaddress, remotecmd]
@@ -160,8 +153,11 @@
         ])
         super(SshGateway, self)._remote_bootstrap_gateway(io, extra)
 
+
 def stdouterrin_setnull():
-    # redirect file descriptors 0 and 1 to /dev/null, to avoid
+    """ redirect file descriptors 0 and 1 (and possibly 2) to /dev/null. 
+        note that this function may run remotely without py lib support. 
+    """
     # complete confusion (this is independent from the sys.stdout
     # and sys.stderr redirection that gateway.remote_exec() can do)
     # note that we redirect fd 2 on win too, since for some reason that
@@ -188,55 +184,3 @@
         os.dup2(fd, 2)
     os.close(fd)
 
-# XXX
-# XXX unusued code below
-# XXX
-
-class ExecGateway(PopenGateway):
-    def remote_exec_sync_stdcapture(self, lines, callback):
-        # hack: turn the content of the cell into
-        #
-        # if 1:
-        #    line1
-        #    line2
-        #    ...
-        #
-        lines = ['   ' + line for line in lines]
-        lines.insert(0, 'if 1:')
-        lines.append('')
-        sourcecode = '\n'.join(lines)
-        try:
-            callbacks = self.callbacks
-        except AttributeError:
-            callbacks = self.callbacks = {}
-        answerid = id(callback)
-        self.callbacks[answerid] = callback
-
-        self.exec_remote('''
-            import sys, StringIO
-            try:
-                execns
-            except:
-                execns = {}
-            oldout, olderr = sys.stdout, sys.stderr
-            try:
-                buffer = StringIO.StringIO()
-                sys.stdout = sys.stderr = buffer
-                try:
-                    exec compile(%(sourcecode)r, 'single') in execns
-                except:
-                    import traceback
-                    traceback.print_exc()
-            finally:
-                sys.stdout=oldout
-                sys.stderr=olderr
-            # fiddle us (the caller) into executing the callback on remote answers
-            gateway.exec_remote(
-                "gateway.invoke_callback(%(answerid)r, %%r)" %% buffer.getvalue())
-        ''' % locals())
-
-    def invoke_callback(self, answerid, value):
-        callback = self.callbacks[answerid]
-        del self.callbacks[answerid]
-        callback(value)
-

Modified: py/trunk/py/execnet/testing/test_gateway.py
==============================================================================
--- py/trunk/py/execnet/testing/test_gateway.py	(original)
+++ py/trunk/py/execnet/testing/test_gateway.py	Fri Feb  2 20:57:47 2007
@@ -302,10 +302,11 @@
     def test_remote_exec_redirect_multi(self): 
         num = 3
         l = [[] for x in range(num)]
-        channels = [self.gw.remote_exec("print %d" % i, stdout=l[i].append)
+        channels = [self.gw.remote_exec("print %d" % i, 
+                                        stdout=l[i].append)
                         for i in range(num)]
         for x in channels: 
-            x.waitclose(1.0) 
+            x.waitclose(5.0) 
 
         for i in range(num): 
             subl = l[i] 
@@ -451,9 +452,9 @@
     def setup_class(cls):
         # open a gateway to a fresh child process
         cls.proxygw = py.execnet.PopenGateway() 
-        cls.gw = py.execnet.SocketGateway.remote_install(cls.proxygw,
-                                                         ("127.0.0.1", 0)
-                                                            ) 
+        cls.gw = py.execnet.SocketGateway.new_remote(cls.proxygw,
+                                                     ("127.0.0.1", 0)
+                                                     ) 
 
 ##    def teardown_class(cls):
 ##        cls.gw.exit()



More information about the pytest-commit mailing list