[Python-checkins] cpython: asyncio: Add new file (forgotten).

guido.van.rossum python-checkins at python.org
Wed Oct 30 22:56:55 CET 2013


http://hg.python.org/cpython/rev/a28f3eebe8ae
changeset:   86789:a28f3eebe8ae
user:        Guido van Rossum <guido at dropbox.com>
date:        Wed Oct 30 14:56:49 2013 -0700
summary:
  asyncio: Add new file (forgotten).

files:
  Lib/asyncio/base_subprocess.py |  166 +++++++++++++++++++++
  1 files changed, 166 insertions(+), 0 deletions(-)


diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py
new file mode 100644
--- /dev/null
+++ b/Lib/asyncio/base_subprocess.py
@@ -0,0 +1,166 @@
+import collections
+import subprocess
+
+from . import protocols
+from . import tasks
+from . import transports
+
+
+STDIN = 0
+STDOUT = 1
+STDERR = 2
+
+
+class BaseSubprocessTransport(transports.SubprocessTransport):
+
+    def __init__(self, loop, protocol, args, shell,
+                 stdin, stdout, stderr, bufsize,
+                 extra=None, **kwargs):
+        super().__init__(extra)
+        self._protocol = protocol
+        self._loop = loop
+
+        self._pipes = {}
+        if stdin == subprocess.PIPE:
+            self._pipes[STDIN] = None
+        if stdout == subprocess.PIPE:
+            self._pipes[STDOUT] = None
+        if stderr == subprocess.PIPE:
+            self._pipes[STDERR] = None
+        self._pending_calls = collections.deque()
+        self._finished = False
+        self._returncode = None
+        self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
+                    stderr=stderr, bufsize=bufsize, **kwargs)
+        self._extra['subprocess'] = self._proc
+
+    def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
+        raise NotImplementedError
+
+    def _make_write_subprocess_pipe_proto(self, fd):
+        raise NotImplementedError
+
+    def _make_read_subprocess_pipe_proto(self, fd):
+        raise NotImplementedError
+
+    def close(self):
+        for proto in self._pipes.values():
+            proto.pipe.close()
+        if self._returncode is None:
+            self.terminate()
+
+    def get_pid(self):
+        return self._proc.pid
+
+    def get_returncode(self):
+        return self._returncode
+
+    def get_pipe_transport(self, fd):
+        if fd in self._pipes:
+            return self._pipes[fd].pipe
+        else:
+            return None
+
+    def send_signal(self, signal):
+        self._proc.send_signal(signal)
+
+    def terminate(self):
+        self._proc.terminate()
+
+    def kill(self):
+        self._proc.kill()
+
+    @tasks.coroutine
+    def _post_init(self):
+        proc = self._proc
+        loop = self._loop
+        if proc.stdin is not None:
+            transp, proto = yield from loop.connect_write_pipe(
+                lambda: WriteSubprocessPipeProto(self, STDIN),
+                proc.stdin)
+        if proc.stdout is not None:
+            transp, proto = yield from loop.connect_read_pipe(
+                lambda: ReadSubprocessPipeProto(self, STDOUT),
+                proc.stdout)
+        if proc.stderr is not None:
+            transp, proto = yield from loop.connect_read_pipe(
+                lambda: ReadSubprocessPipeProto(self, STDERR),
+                proc.stderr)
+        if not self._pipes:
+            self._try_connected()
+
+    def _call(self, cb, *data):
+        if self._pending_calls is not None:
+            self._pending_calls.append((cb, data))
+        else:
+            self._loop.call_soon(cb, *data)
+
+    def _try_connected(self):
+        assert self._pending_calls is not None
+        if all(p is not None and p.connected for p in self._pipes.values()):
+            self._loop.call_soon(self._protocol.connection_made, self)
+            for callback, data in self._pending_calls:
+                self._loop.call_soon(callback, *data)
+            self._pending_calls = None
+
+    def _pipe_connection_lost(self, fd, exc):
+        self._call(self._protocol.pipe_connection_lost, fd, exc)
+        self._try_finish()
+
+    def _pipe_data_received(self, fd, data):
+        self._call(self._protocol.pipe_data_received, fd, data)
+
+    def _process_exited(self, returncode):
+        assert returncode is not None, returncode
+        assert self._returncode is None, self._returncode
+        self._returncode = returncode
+        self._loop._subprocess_closed(self)
+        self._call(self._protocol.process_exited)
+        self._try_finish()
+
+    def _try_finish(self):
+        assert not self._finished
+        if self._returncode is None:
+            return
+        if all(p is not None and p.disconnected
+               for p in self._pipes.values()):
+            self._finished = True
+            self._loop.call_soon(self._call_connection_lost, None)
+
+    def _call_connection_lost(self, exc):
+        try:
+            self._protocol.connection_lost(exc)
+        finally:
+            self._proc = None
+            self._protocol = None
+            self._loop = None
+
+
+class WriteSubprocessPipeProto(protocols.BaseProtocol):
+    pipe = None
+
+    def __init__(self, proc, fd):
+        self.proc = proc
+        self.fd = fd
+        self.connected = False
+        self.disconnected = False
+        proc._pipes[fd] = self
+
+    def connection_made(self, transport):
+        self.connected = True
+        self.pipe = transport
+        self.proc._try_connected()
+
+    def connection_lost(self, exc):
+        self.disconnected = True
+        self.proc._pipe_connection_lost(self.fd, exc)
+
+    def eof_received(self):
+        pass
+
+
+class ReadSubprocessPipeProto(WriteSubprocessPipeProto,
+                              protocols.Protocol):
+
+    def data_received(self, data):
+        self.proc._pipe_data_received(self.fd, data)

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list