[Python-checkins] cpython: Issue #18923: Update subprocess to use the new selectors module.

charles-francois.natali python-checkins at python.org
Fri Nov 8 19:55:46 CET 2013


http://hg.python.org/cpython/rev/71b618f0c8e9
changeset:   87008:71b618f0c8e9
user:        Charles-François Natali <cf.natali at gmail.com>
date:        Fri Nov 08 19:56:59 2013 +0100
summary:
  Issue #18923: Update subprocess to use the new selectors module.

files:
  Lib/subprocess.py           |  238 ++++++-----------------
  Lib/test/test_subprocess.py |   10 +-
  2 files changed, 75 insertions(+), 173 deletions(-)


diff --git a/Lib/subprocess.py b/Lib/subprocess.py
--- a/Lib/subprocess.py
+++ b/Lib/subprocess.py
@@ -404,15 +404,23 @@
         hStdError = None
         wShowWindow = 0
 else:
+    import _posixsubprocess
     import select
-    _has_poll = hasattr(select, 'poll')
-    import _posixsubprocess
+    import selectors
 
     # When select or poll has indicated that the file is writable,
     # we can write up to _PIPE_BUF bytes without risk of blocking.
     # POSIX defines PIPE_BUF as >= 512.
     _PIPE_BUF = getattr(select, 'PIPE_BUF', 512)
 
+    # poll/select have the advantage of not requiring any extra file
+    # descriptor, contrarily to epoll/kqueue (also, they require a single
+    # syscall).
+    if hasattr(selectors, 'PollSelector'):
+        _PopenSelector = selectors.PollSelector
+    else:
+        _PopenSelector = selectors.SelectSelector
+
 
 __all__ = ["Popen", "PIPE", "STDOUT", "call", "check_call", "getstatusoutput",
            "getoutput", "check_output", "CalledProcessError", "DEVNULL"]
@@ -1530,12 +1538,65 @@
                 if not input:
                     self.stdin.close()
 
-            if _has_poll:
-                stdout, stderr = self._communicate_with_poll(input, endtime,
-                                                             orig_timeout)
-            else:
-                stdout, stderr = self._communicate_with_select(input, endtime,
-                                                               orig_timeout)
+            stdout = None
+            stderr = None
+
+            # Only create this mapping if we haven't already.
+            if not self._communication_started:
+                self._fileobj2output = {}
+                if self.stdout:
+                    self._fileobj2output[self.stdout] = []
+                if self.stderr:
+                    self._fileobj2output[self.stderr] = []
+
+            if self.stdout:
+                stdout = self._fileobj2output[self.stdout]
+            if self.stderr:
+                stderr = self._fileobj2output[self.stderr]
+
+            self._save_input(input)
+
+            with _PopenSelector() as selector:
+                if self.stdin and input:
+                    selector.register(self.stdin, selectors.EVENT_WRITE)
+                if self.stdout:
+                    selector.register(self.stdout, selectors.EVENT_READ)
+                if self.stderr:
+                    selector.register(self.stderr, selectors.EVENT_READ)
+
+                while selector.get_map():
+                    timeout = self._remaining_time(endtime)
+                    if timeout is not None and timeout < 0:
+                        raise TimeoutExpired(self.args, orig_timeout)
+
+                    ready = selector.select(timeout)
+                    self._check_timeout(endtime, orig_timeout)
+
+                    # XXX Rewrite these to use non-blocking I/O on the file
+                    # objects; they are no longer using C stdio!
+
+                    for key, events in ready:
+                        if key.fileobj is self.stdin:
+                            chunk = self._input[self._input_offset :
+                                                self._input_offset + _PIPE_BUF]
+                            try:
+                                self._input_offset += os.write(key.fd, chunk)
+                            except OSError as e:
+                                if e.errno == errno.EPIPE:
+                                    selector.unregister(key.fileobj)
+                                    key.fileobj.close()
+                                else:
+                                    raise
+                            else:
+                                if self._input_offset >= len(self._input):
+                                    selector.unregister(key.fileobj)
+                                    key.fileobj.close()
+                        elif key.fileobj in (self.stdout, self.stderr):
+                            data = os.read(key.fd, 4096)
+                            if not data:
+                                selector.unregister(key.fileobj)
+                                key.fileobj.close()
+                            self._fileobj2output[key.fileobj].append(data)
 
             self.wait(timeout=self._remaining_time(endtime))
 
@@ -1569,167 +1630,6 @@
                     self._input = self._input.encode(self.stdin.encoding)
 
 
-        def _communicate_with_poll(self, input, endtime, orig_timeout):
-            stdout = None # Return
-            stderr = None # Return
-
-            if not self._communication_started:
-                self._fd2file = {}
-
-            poller = select.poll()
-            def register_and_append(file_obj, eventmask):
-                poller.register(file_obj.fileno(), eventmask)
-                self._fd2file[file_obj.fileno()] = file_obj
-
-            def close_unregister_and_remove(fd):
-                poller.unregister(fd)
-                self._fd2file[fd].close()
-                self._fd2file.pop(fd)
-
-            if self.stdin and input:
-                register_and_append(self.stdin, select.POLLOUT)
-
-            # Only create this mapping if we haven't already.
-            if not self._communication_started:
-                self._fd2output = {}
-                if self.stdout:
-                    self._fd2output[self.stdout.fileno()] = []
-                if self.stderr:
-                    self._fd2output[self.stderr.fileno()] = []
-
-            select_POLLIN_POLLPRI = select.POLLIN | select.POLLPRI
-            if self.stdout:
-                register_and_append(self.stdout, select_POLLIN_POLLPRI)
-                stdout = self._fd2output[self.stdout.fileno()]
-            if self.stderr:
-                register_and_append(self.stderr, select_POLLIN_POLLPRI)
-                stderr = self._fd2output[self.stderr.fileno()]
-
-            self._save_input(input)
-
-            while self._fd2file:
-                timeout = self._remaining_time(endtime)
-                if timeout is not None and timeout < 0:
-                    raise TimeoutExpired(self.args, orig_timeout)
-                try:
-                    ready = poller.poll(timeout)
-                except OSError as e:
-                    if e.args[0] == errno.EINTR:
-                        continue
-                    raise
-                self._check_timeout(endtime, orig_timeout)
-
-                # XXX Rewrite these to use non-blocking I/O on the
-                # file objects; they are no longer using C stdio!
-
-                for fd, mode in ready:
-                    if mode & select.POLLOUT:
-                        chunk = self._input[self._input_offset :
-                                            self._input_offset + _PIPE_BUF]
-                        try:
-                            self._input_offset += os.write(fd, chunk)
-                        except OSError as e:
-                            if e.errno == errno.EPIPE:
-                                close_unregister_and_remove(fd)
-                            else:
-                                raise
-                        else:
-                            if self._input_offset >= len(self._input):
-                                close_unregister_and_remove(fd)
-                    elif mode & select_POLLIN_POLLPRI:
-                        data = os.read(fd, 4096)
-                        if not data:
-                            close_unregister_and_remove(fd)
-                        self._fd2output[fd].append(data)
-                    else:
-                        # Ignore hang up or errors.
-                        close_unregister_and_remove(fd)
-
-            return (stdout, stderr)
-
-
-        def _communicate_with_select(self, input, endtime, orig_timeout):
-            if not self._communication_started:
-                self._read_set = []
-                self._write_set = []
-                if self.stdin and input:
-                    self._write_set.append(self.stdin)
-                if self.stdout:
-                    self._read_set.append(self.stdout)
-                if self.stderr:
-                    self._read_set.append(self.stderr)
-
-            self._save_input(input)
-
-            stdout = None # Return
-            stderr = None # Return
-
-            if self.stdout:
-                if not self._communication_started:
-                    self._stdout_buff = []
-                stdout = self._stdout_buff
-            if self.stderr:
-                if not self._communication_started:
-                    self._stderr_buff = []
-                stderr = self._stderr_buff
-
-            while self._read_set or self._write_set:
-                timeout = self._remaining_time(endtime)
-                if timeout is not None and timeout < 0:
-                    raise TimeoutExpired(self.args, orig_timeout)
-                try:
-                    (rlist, wlist, xlist) = \
-                        select.select(self._read_set, self._write_set, [],
-                                      timeout)
-                except OSError as e:
-                    if e.args[0] == errno.EINTR:
-                        continue
-                    raise
-
-                # According to the docs, returning three empty lists indicates
-                # that the timeout expired.
-                if not (rlist or wlist or xlist):
-                    raise TimeoutExpired(self.args, orig_timeout)
-                # We also check what time it is ourselves for good measure.
-                self._check_timeout(endtime, orig_timeout)
-
-                # XXX Rewrite these to use non-blocking I/O on the
-                # file objects; they are no longer using C stdio!
-
-                if self.stdin in wlist:
-                    chunk = self._input[self._input_offset :
-                                        self._input_offset + _PIPE_BUF]
-                    try:
-                        bytes_written = os.write(self.stdin.fileno(), chunk)
-                    except OSError as e:
-                        if e.errno == errno.EPIPE:
-                            self.stdin.close()
-                            self._write_set.remove(self.stdin)
-                        else:
-                            raise
-                    else:
-                        self._input_offset += bytes_written
-                        if self._input_offset >= len(self._input):
-                            self.stdin.close()
-                            self._write_set.remove(self.stdin)
-
-                if self.stdout in rlist:
-                    data = os.read(self.stdout.fileno(), 1024)
-                    if not data:
-                        self.stdout.close()
-                        self._read_set.remove(self.stdout)
-                    stdout.append(data)
-
-                if self.stderr in rlist:
-                    data = os.read(self.stderr.fileno(), 1024)
-                    if not data:
-                        self.stderr.close()
-                        self._read_set.remove(self.stderr)
-                    stderr.append(data)
-
-            return (stdout, stderr)
-
-
         def send_signal(self, sig):
             """Send a signal to the process
             """
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py
--- a/Lib/test/test_subprocess.py
+++ b/Lib/test/test_subprocess.py
@@ -11,6 +11,7 @@
 import tempfile
 import time
 import re
+import selectors
 import sysconfig
 import warnings
 import select
@@ -2179,15 +2180,16 @@
                 os.rmdir(dir)
 
 
- at unittest.skipUnless(getattr(subprocess, '_has_poll', False),
-                     "poll system call not supported")
+ at unittest.skipUnless(hasattr(selectors, 'PollSelector'),
+                     "Test needs selectors.PollSelector")
 class ProcessTestCaseNoPoll(ProcessTestCase):
     def setUp(self):
-        subprocess._has_poll = False
+        self.orig_selector = subprocess._PopenSelector
+        subprocess._PopenSelector = selectors.SelectSelector
         ProcessTestCase.setUp(self)
 
     def tearDown(self):
-        subprocess._has_poll = True
+        subprocess._PopenSelector = self.orig_selector
         ProcessTestCase.tearDown(self)
 
 

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list