[Python-checkins] cpython: Add a 'timeout' argument to subprocess.Popen.

reid.kleckner python-checkins at python.org
Mon Mar 14 17:16:11 CET 2011


http://hg.python.org/cpython/rev/c4a0fa6e687c
changeset:   68454:c4a0fa6e687c
parent:      68447:c919a82b5a40
user:        Reid Kleckner <reid at kleckner.net>
date:        Mon Mar 14 12:02:10 2011 -0400
summary:
  Add a 'timeout' argument to subprocess.Popen.

If the timeout expires before the subprocess exits, the wait method and the
communicate method will raise a subprocess.TimeoutExpired exception.  When used
with communicate, it is possible to catch the exception, kill the process, and
retry the communicate and receive any output written to stdout or stderr.

files:
  Doc/library/subprocess.rst
  Lib/subprocess.py
  Lib/test/test_subprocess.py
  PC/_subprocess.c

diff --git a/Doc/library/subprocess.rst b/Doc/library/subprocess.rst
--- a/Doc/library/subprocess.rst
+++ b/Doc/library/subprocess.rst
@@ -249,15 +249,21 @@
 This module also defines four shortcut functions:
 
 
-.. function:: call(*popenargs, **kwargs)
+.. function:: call(*popenargs, timeout=None, **kwargs)
 
    Run command with arguments.  Wait for command to complete, then return the
    :attr:`returncode` attribute.
 
-   The arguments are the same as for the :class:`Popen` constructor.  Example::
+   The arguments are the same as for the :class:`Popen` constructor, with the
+   exception of the *timeout* argument, which is given to :meth:`Popen.wait`.
+   Example::
 
       >>> retcode = subprocess.call(["ls", "-l"])
 
+   If the timeout expires, the child process will be killed and then waited for
+   again.  The :exc:`TimeoutExpired` exception will be re-raised after the child
+   process has terminated.
+
    .. warning::
 
       Like :meth:`Popen.wait`, this will deadlock when using
@@ -265,34 +271,43 @@
       generates enough output to a pipe such that it blocks waiting
       for the OS pipe buffer to accept more data.
 
+   .. versionchanged:: 3.2
+      *timeout* was added.
 
-.. function:: check_call(*popenargs, **kwargs)
+
+.. function:: check_call(*popenargs, timeout=None, **kwargs)
 
    Run command with arguments.  Wait for command to complete. If the exit code was
    zero then return, otherwise raise :exc:`CalledProcessError`. The
    :exc:`CalledProcessError` object will have the return code in the
    :attr:`returncode` attribute.
 
-   The arguments are the same as for the :class:`Popen` constructor.  Example::
+   The arguments are the same as for the :func:`call` function.  Example::
 
       >>> subprocess.check_call(["ls", "-l"])
       0
 
+   As in the :func:`call` function, if the timeout expires, the child process
+   will be killed and the wait retried.  The :exc:`TimeoutExpired` exception
+   will be re-raised after the child process has terminated.
+
    .. warning::
 
       See the warning for :func:`call`.
 
+   .. versionchanged:: 3.2
+      *timeout* was added.
 
-.. function:: check_output(*popenargs, **kwargs)
+
+.. function:: check_output(*popenargs, timeout=None, **kwargs)
 
    Run command with arguments and return its output as a byte string.
 
    If the exit code was non-zero it raises a :exc:`CalledProcessError`.  The
    :exc:`CalledProcessError` object will have the return code in the
-   :attr:`returncode`
-   attribute and output in the :attr:`output` attribute.
+   :attr:`returncode` attribute and output in the :attr:`output` attribute.
 
-   The arguments are the same as for the :class:`Popen` constructor.  Example::
+   The arguments are the same as for the :func:`call` function.  Example::
 
       >>> subprocess.check_output(["ls", "-l", "/dev/null"])
       b'crw-rw-rw- 1 root root 1, 3 Oct 18  2007 /dev/null\n'
@@ -305,8 +320,17 @@
       ...     stderr=subprocess.STDOUT)
       b'ls: non_existent_file: No such file or directory\n'
 
+   As in the :func:`call` function, if the timeout expires, the child process
+   will be killed and the wait retried.  The :exc:`TimeoutExpired` exception
+   will be re-raised after the child process has terminated.  The output from
+   the child process so far will be in the :attr:`output` attribute of the
+   exception.
+
    .. versionadded:: 3.1
 
+   .. versionchanged:: 3.2
+      *timeout* was added.
+
 
 .. function:: getstatusoutput(cmd)
 
@@ -359,6 +383,10 @@
 check_call() will raise :exc:`CalledProcessError`, if the called process returns
 a non-zero return code.
 
+All of the functions and methods that accept a *timeout* parameter, such as
+:func:`call` and :meth:`Popen.communicate` will raise :exc:`TimeoutExpired` if
+the timeout expires before the process exits.
+
 
 Security
 ^^^^^^^^
@@ -380,11 +408,15 @@
    attribute.
 
 
-.. method:: Popen.wait()
+.. method:: Popen.wait(timeout=None)
 
    Wait for child process to terminate.  Set and return :attr:`returncode`
    attribute.
 
+   If the process does not terminate after *timeout* seconds, raise a
+   :exc:`TimeoutExpired` exception.  It is safe to catch this exception and
+   retry the wait.
+
    .. warning::
 
       This will deadlock when using ``stdout=PIPE`` and/or
@@ -392,11 +424,14 @@
       a pipe such that it blocks waiting for the OS pipe buffer to
       accept more data.  Use :meth:`communicate` to avoid that.
 
+   .. versionchanged:: 3.2
+      *timeout* was added.
 
-.. method:: Popen.communicate(input=None)
+
+.. method:: Popen.communicate(input=None, timeout=None)
 
    Interact with process: Send data to stdin.  Read data from stdout and stderr,
-   until end-of-file is reached.  Wait for process to terminate. The optional
+   until end-of-file is reached.  Wait for process to terminate.  The optional
    *input* argument should be a byte string to be sent to the child process, or
    ``None``, if no data should be sent to the child.
 
@@ -407,11 +442,29 @@
    ``None`` in the result tuple, you need to give ``stdout=PIPE`` and/or
    ``stderr=PIPE`` too.
 
+   If the process does not terminate after *timeout* seconds, a
+   :exc:`TimeoutExpired` exception will be raised.  Catching this exception and
+   retrying communication will not lose any output.
+
+   The child process is not killed if the timeout expires, so in order to
+   cleanup properly a well-behaved application should kill the child process and
+   finish communication::
+
+      proc = subprocess.Popen(...)
+      try:
+          outs, errs = proc.communicate(timeout=15)
+      except TimeoutExpired:
+          proc.kill()
+          outs, errs = proc.communicate()
+
    .. note::
 
       The data read is buffered in memory, so do not use this method if the data
       size is large or unlimited.
 
+   .. versionchanged:: 3.2
+      *timeout* was added.
+
 
 .. method:: Popen.send_signal(signal)
 
diff --git a/Lib/subprocess.py b/Lib/subprocess.py
--- a/Lib/subprocess.py
+++ b/Lib/subprocess.py
@@ -340,6 +340,7 @@
 
 import io
 import os
+import time
 import traceback
 import gc
 import signal
@@ -361,6 +362,19 @@
         return "Command '%s' returned non-zero exit status %d" % (self.cmd, self.returncode)
 
 
+class TimeoutExpired(Exception):
+    """This exception is raised when the timeout expires while waiting for a
+    child process.
+    """
+    def __init__(self, cmd, output=None):
+        self.cmd = cmd
+        self.output = output
+
+    def __str__(self):
+        return ("Command '%s' timed out after %s seconds" %
+                (self.cmd, self.timeout))
+
+
 if mswindows:
     import threading
     import msvcrt
@@ -449,15 +463,21 @@
             raise
 
 
-def call(*popenargs, **kwargs):
-    """Run command with arguments.  Wait for command to complete, then
-    return the returncode attribute.
+def call(*popenargs, timeout=None, **kwargs):
+    """Run command with arguments.  Wait for command to complete or
+    timeout, then return the returncode attribute.
 
     The arguments are the same as for the Popen constructor.  Example:
 
     retcode = call(["ls", "-l"])
     """
-    return Popen(*popenargs, **kwargs).wait()
+    p = Popen(*popenargs, **kwargs)
+    try:
+        return p.wait(timeout=timeout)
+    except TimeoutExpired:
+        p.kill()
+        p.wait()
+        raise
 
 
 def check_call(*popenargs, **kwargs):
@@ -466,7 +486,7 @@
     CalledProcessError.  The CalledProcessError object will have the
     return code in the returncode attribute.
 
-    The arguments are the same as for the Popen constructor.  Example:
+    The arguments are the same as for the call function.  Example:
 
     check_call(["ls", "-l"])
     """
@@ -479,7 +499,7 @@
     return 0
 
 
-def check_output(*popenargs, **kwargs):
+def check_output(*popenargs, timeout=None, **kwargs):
     r"""Run command with arguments and return its output as a byte string.
 
     If the exit code was non-zero it raises a CalledProcessError.  The
@@ -502,13 +522,15 @@
     if 'stdout' in kwargs:
         raise ValueError('stdout argument not allowed, it will be overridden.')
     process = Popen(*popenargs, stdout=PIPE, **kwargs)
-    output, unused_err = process.communicate()
+    try:
+        output, unused_err = process.communicate(timeout=timeout)
+    except TimeoutExpired:
+        process.kill()
+        output, unused_err = process.communicate()
+        raise TimeoutExpired(process.args, output=output)
     retcode = process.poll()
     if retcode:
-        cmd = kwargs.get("args")
-        if cmd is None:
-            cmd = popenargs[0]
-        raise CalledProcessError(retcode, cmd, output=output)
+        raise CalledProcessError(retcode, process.args, output=output)
     return output
 
 
@@ -639,6 +661,8 @@
         _cleanup()
 
         self._child_created = False
+        self._input = None
+        self._communication_started = False
         if bufsize is None:
             bufsize = 0  # Restore default
         if not isinstance(bufsize, int):
@@ -673,6 +697,7 @@
                 raise ValueError("creationflags is only supported on Windows "
                                  "platforms")
 
+        self.args = args
         self.stdin = None
         self.stdout = None
         self.stderr = None
@@ -771,7 +796,7 @@
             _active.append(self)
 
 
-    def communicate(self, input=None):
+    def communicate(self, input=None, timeout=None):
         """Interact with process: Send data to stdin.  Read data from
         stdout and stderr, until end-of-file is reached.  Wait for
         process to terminate.  The optional input argument should be a
@@ -780,9 +805,19 @@
 
         communicate() returns a tuple (stdout, stderr)."""
 
-        # Optimization: If we are only using one pipe, or no pipe at
-        # all, using select() or threads is unnecessary.
-        if [self.stdin, self.stdout, self.stderr].count(None) >= 2:
+        if self._communication_started and input:
+            raise ValueError("Cannot send input after starting communication")
+
+        if timeout is not None:
+            endtime = time.time() + timeout
+        else:
+            endtime = None
+
+        # Optimization: If we are not worried about timeouts, we haven't
+        # started communicating, and we have one or zero pipes, using select()
+        # or threads is unnecessary.
+        if (endtime is None and not self._communication_started and
+            [self.stdin, self.stdout, self.stderr].count(None) >= 2):
             stdout = None
             stderr = None
             if self.stdin:
@@ -798,13 +833,36 @@
             self.wait()
             return (stdout, stderr)
 
-        return self._communicate(input)
+        try:
+            stdout, stderr = self._communicate(input, endtime)
+        finally:
+            self._communication_started = True
+
+        sts = self.wait(timeout=self._remaining_time(endtime))
+
+        return (stdout, stderr)
 
 
     def poll(self):
         return self._internal_poll()
 
 
+    def _remaining_time(self, endtime):
+        """Convenience for _communicate when computing timeouts."""
+        if endtime is None:
+            return None
+        else:
+            return endtime - time.time()
+
+
+    def _check_timeout(self, endtime):
+        """Convenience for checking if a timeout has expired."""
+        if endtime is None:
+            return
+        if time.time() > endtime:
+            raise TimeoutExpired(self.args)
+
+
     if mswindows:
         #
         # Windows methods
@@ -987,12 +1045,17 @@
             return self.returncode
 
 
-        def wait(self):
+        def wait(self, timeout=None):
             """Wait for child process to terminate.  Returns returncode
             attribute."""
+            if timeout is None:
+                timeout = _subprocess.INFINITE
+            else:
+                timeout = int(timeout * 1000)
             if self.returncode is None:
-                _subprocess.WaitForSingleObject(self._handle,
-                                                _subprocess.INFINITE)
+                result = _subprocess.WaitForSingleObject(self._handle, timeout)
+                if result == _subprocess.WAIT_TIMEOUT:
+                    raise TimeoutExpired(self.args)
                 self.returncode = _subprocess.GetExitCodeProcess(self._handle)
             return self.returncode
 
@@ -1002,32 +1065,51 @@
             fh.close()
 
 
-        def _communicate(self, input):
-            stdout = None # Return
-            stderr = None # Return
-
-            if self.stdout:
-                stdout = []
-                stdout_thread = threading.Thread(target=self._readerthread,
-                                                 args=(self.stdout, stdout))
-                stdout_thread.daemon = True
-                stdout_thread.start()
-            if self.stderr:
-                stderr = []
-                stderr_thread = threading.Thread(target=self._readerthread,
-                                                 args=(self.stderr, stderr))
-                stderr_thread.daemon = True
-                stderr_thread.start()
+        def _communicate(self, input, endtime):
+            # Start reader threads feeding into a list hanging off of this
+            # object, unless they've already been started.
+            if self.stdout and not hasattr(self, "_stdout_buff"):
+                self._stdout_buff = []
+                self.stdout_thread = \
+                        threading.Thread(target=self._readerthread,
+                                         args=(self.stdout, self._stdout_buff))
+                self.stdout_thread.daemon = True
+                self.stdout_thread.start()
+            if self.stderr and not hasattr(self, "_stderr_buff"):
+                self._stderr_buff = []
+                self.stderr_thread = \
+                        threading.Thread(target=self._readerthread,
+                                         args=(self.stderr, self._stderr_buff))
+                self.stderr_thread.daemon = True
+                self.stderr_thread.start()
 
             if self.stdin:
                 if input is not None:
                     self.stdin.write(input)
                 self.stdin.close()
 
+            # Wait for the reader threads, or time out.  If we time out, the
+            # threads remain reading and the fds left open in case the user
+            # calls communicate again.
+            if self.stdout is not None:
+                self.stdout_thread.join(self._remaining_time(endtime))
+                if self.stdout_thread.isAlive():
+                    raise TimeoutExpired(self.args)
+            if self.stderr is not None:
+                self.stderr_thread.join(self._remaining_time(endtime))
+                if self.stderr_thread.isAlive():
+                    raise TimeoutExpired(self.args)
+
+            # Collect the output from and close both pipes, now that we know
+            # both have been read successfully.
+            stdout = None
+            stderr = None
             if self.stdout:
-                stdout_thread.join()
+                stdout = self._stdout_buff
+                self.stdout.close()
             if self.stderr:
-                stderr_thread.join()
+                stderr = self._stderr_buff
+                self.stderr.close()
 
             # All data exchanged.  Translate lists into strings.
             if stdout is not None:
@@ -1035,7 +1117,6 @@
             if stderr is not None:
                 stderr = stderr[0]
 
-            self.wait()
             return (stdout, stderr)
 
         def send_signal(self, sig):
@@ -1365,25 +1446,52 @@
             return self.returncode
 
 
-        def wait(self):
+        def _try_wait(self, wait_flags):
+            try:
+                (pid, sts) = _eintr_retry_call(os.waitpid, self.pid, wait_flags)
+            except OSError as e:
+                if e.errno != errno.ECHILD:
+                    raise
+                # This happens if SIGCLD is set to be ignored or waiting
+                # for child processes has otherwise been disabled for our
+                # process.  This child is dead, we can't get the status.
+                pid = self.pid
+                sts = 0
+            return (pid, sts)
+
+
+        def wait(self, timeout=None, endtime=None):
             """Wait for child process to terminate.  Returns returncode
             attribute."""
-            if self.returncode is None:
-                try:
-                    pid, sts = _eintr_retry_call(os.waitpid, self.pid, 0)
-                except OSError as e:
-                    if e.errno != errno.ECHILD:
-                        raise
-                    # This happens if SIGCLD is set to be ignored or waiting
-                    # for child processes has otherwise been disabled for our
-                    # process.  This child is dead, we can't get the status.
-                    sts = 0
+            # If timeout was passed but not endtime, compute endtime in terms of
+            # timeout.
+            if endtime is None and timeout is not None:
+                endtime = time.time() + timeout
+            if self.returncode is not None:
+                return self.returncode
+            elif endtime is not None:
+                # Enter a busy loop if we have a timeout.  This busy loop was
+                # cribbed from Lib/threading.py in Thread.wait() at r71065.
+                delay = 0.0005 # 500 us -> initial delay of 1 ms
+                while True:
+                    (pid, sts) = self._try_wait(os.WNOHANG)
+                    assert pid == self.pid or pid == 0
+                    if pid == self.pid:
+                        self._handle_exitstatus(sts)
+                        break
+                    remaining = self._remaining_time(endtime)
+                    if remaining <= 0:
+                        raise TimeoutExpired(self.args)
+                    delay = min(delay * 2, remaining, .05)
+                    time.sleep(delay)
+            elif self.returncode is None:
+                (pid, sts) = self._try_wait(0)
                 self._handle_exitstatus(sts)
             return self.returncode
 
 
-        def _communicate(self, input):
-            if self.stdin:
+        def _communicate(self, input, endtime):
+            if self.stdin and not self._communication_started:
                 # Flush stdio buffer.  This might block, if the user has
                 # been writing to .stdin in an uncontrolled fashion.
                 self.stdin.flush()
@@ -1391,9 +1499,11 @@
                     self.stdin.close()
 
             if _has_poll:
-                stdout, stderr = self._communicate_with_poll(input)
+                stdout, stderr = self._communicate_with_poll(input, endtime)
             else:
-                stdout, stderr = self._communicate_with_select(input)
+                stdout, stderr = self._communicate_with_select(input, endtime)
+
+            self.wait(timeout=self._remaining_time(endtime))
 
             # All data exchanged.  Translate lists into strings.
             if stdout is not None:
@@ -1411,60 +1521,77 @@
                     stderr = self._translate_newlines(stderr,
                                                       self.stderr.encoding)
 
-            self.wait()
             return (stdout, stderr)
 
 
-        def _communicate_with_poll(self, input):
+        def _communicate_with_poll(self, input, endtime):
             stdout = None # Return
             stderr = None # Return
-            fd2file = {}
-            fd2output = {}
+
+            if not self._communication_started:
+                self._fd2file = {}
 
             poller = select.poll()
             def register_and_append(file_obj, eventmask):
                 poller.register(file_obj.fileno(), eventmask)
-                fd2file[file_obj.fileno()] = file_obj
+                self._fd2file[file_obj.fileno()] = file_obj
 
             def close_unregister_and_remove(fd):
                 poller.unregister(fd)
-                fd2file[fd].close()
-                fd2file.pop(fd)
+                self._fd2file[fd].close()
+                self._fd2file.pop(fd)
 
             if self.stdin and input:
                 register_and_append(self.stdin, select.POLLOUT)
 
+            # Only create this mapping if we haven't already.
+            if not self._communication_started:
+                self._fd2output = {}
+                if self.stdout:
+                    self._fd2output[self.stdout.fileno()] = []
+                if self.stderr:
+                    self._fd2output[self.stderr.fileno()] = []
+
             select_POLLIN_POLLPRI = select.POLLIN | select.POLLPRI
             if self.stdout:
                 register_and_append(self.stdout, select_POLLIN_POLLPRI)
-                fd2output[self.stdout.fileno()] = stdout = []
+                stdout = self._fd2output[self.stdout.fileno()]
             if self.stderr:
                 register_and_append(self.stderr, select_POLLIN_POLLPRI)
-                fd2output[self.stderr.fileno()] = stderr = []
+                stderr = self._fd2output[self.stderr.fileno()]
 
-            input_offset = 0
-            while fd2file:
+            # Save the input here so that if we time out while communicating,
+            # we can continue sending input if we retry.
+            if self.stdin and self._input is None:
+                self._input_offset = 0
+                self._input = input
+                if self.universal_newlines:
+                    self._input = self._input.encode(self.stdin.encoding)
+
+            while self._fd2file:
                 try:
-                    ready = poller.poll()
+                    ready = poller.poll(self._remaining_time(endtime))
                 except select.error as e:
                     if e.args[0] == errno.EINTR:
                         continue
                     raise
+                self._check_timeout(endtime)
 
                 # XXX Rewrite these to use non-blocking I/O on the
                 # file objects; they are no longer using C stdio!
 
                 for fd, mode in ready:
                     if mode & select.POLLOUT:
-                        chunk = input[input_offset : input_offset + _PIPE_BUF]
-                        input_offset += os.write(fd, chunk)
-                        if input_offset >= len(input):
+                        chunk = self._input[self._input_offset :
+                                            self._input_offset + _PIPE_BUF]
+                        self._input_offset += os.write(fd, chunk)
+                        if self._input_offset >= len(self._input):
                             close_unregister_and_remove(fd)
                     elif mode & select_POLLIN_POLLPRI:
                         data = os.read(fd, 4096)
                         if not data:
                             close_unregister_and_remove(fd)
-                        fd2output[fd].append(data)
+                        self._fd2output[fd].append(data)
                     else:
                         # Ignore hang up or errors.
                         close_unregister_and_remove(fd)
@@ -1472,53 +1599,76 @@
             return (stdout, stderr)
 
 
-        def _communicate_with_select(self, input):
-            read_set = []
-            write_set = []
+        def _communicate_with_select(self, input, endtime):
+            if not self._communication_started:
+                self._read_set = []
+                self._write_set = []
+                if self.stdin and input:
+                    self._write_set.append(self.stdin)
+                if self.stdout:
+                    self._read_set.append(self.stdout)
+                if self.stderr:
+                    self._read_set.append(self.stderr)
+
+            if self.stdin and self._input is None:
+                self._input_offset = 0
+                self._input = input
+                if self.universal_newlines:
+                    self._input = self._input.encode(self.stdin.encoding)
+
             stdout = None # Return
             stderr = None # Return
 
-            if self.stdin and input:
-                write_set.append(self.stdin)
             if self.stdout:
-                read_set.append(self.stdout)
-                stdout = []
+                if not self._communication_started:
+                    self._stdout_buff = []
+                stdout = self._stdout_buff
             if self.stderr:
-                read_set.append(self.stderr)
-                stderr = []
+                if not self._communication_started:
+                    self._stderr_buff = []
+                stderr = self._stderr_buff
 
-            input_offset = 0
-            while read_set or write_set:
+            while self._read_set or self._write_set:
                 try:
-                    rlist, wlist, xlist = select.select(read_set, write_set, [])
+                    (rlist, wlist, xlist) = \
+                        select.select(self._read_set, self._write_set, [],
+                                      self._remaining_time(endtime))
                 except select.error as e:
                     if e.args[0] == errno.EINTR:
                         continue
                     raise
 
+                # According to the docs, returning three empty lists indicates
+                # that the timeout expired.
+                if not (rlist or wlist or xlist):
+                    raise TimeoutExpired(self.args)
+                # We also check what time it is ourselves for good measure.
+                self._check_timeout(endtime)
+
                 # XXX Rewrite these to use non-blocking I/O on the
                 # file objects; they are no longer using C stdio!
 
                 if self.stdin in wlist:
-                    chunk = input[input_offset : input_offset + _PIPE_BUF]
+                    chunk = self._input[self._input_offset :
+                                        self._input_offset + _PIPE_BUF]
                     bytes_written = os.write(self.stdin.fileno(), chunk)
-                    input_offset += bytes_written
-                    if input_offset >= len(input):
+                    self._input_offset += bytes_written
+                    if self._input_offset >= len(self._input):
                         self.stdin.close()
-                        write_set.remove(self.stdin)
+                        self._write_set.remove(self.stdin)
 
                 if self.stdout in rlist:
                     data = os.read(self.stdout.fileno(), 1024)
                     if not data:
                         self.stdout.close()
-                        read_set.remove(self.stdout)
+                        self._read_set.remove(self.stdout)
                     stdout.append(data)
 
                 if self.stderr in rlist:
                     data = os.read(self.stderr.fileno(), 1024)
                     if not data:
                         self.stderr.close()
-                        read_set.remove(self.stderr)
+                        self._read_set.remove(self.stderr)
                     stderr.append(data)
 
             return (stdout, stderr)
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py
--- a/Lib/test/test_subprocess.py
+++ b/Lib/test/test_subprocess.py
@@ -56,6 +56,8 @@
         # shutdown time.  That frustrates tests trying to check stderr produced
         # from a spawned Python process.
         actual = support.strip_python_stderr(stderr)
+        # strip_python_stderr also strips whitespace, so we do too.
+        expected = expected.strip()
         self.assertEqual(actual, expected, msg)
 
 
@@ -67,6 +69,15 @@
                               "import sys; sys.exit(47)"])
         self.assertEqual(rc, 47)
 
+    def test_call_timeout(self):
+        # call() function with timeout argument; we want to test that the child
+        # process gets killed when the timeout expires.  If the child isn't
+        # killed, this call will deadlock since subprocess.call waits for the
+        # child.
+        self.assertRaises(subprocess.TimeoutExpired, subprocess.call,
+                          [sys.executable, "-c", "while True: pass"],
+                          timeout=0.1)
+
     def test_check_call_zero(self):
         # check_call() function with zero return code
         rc = subprocess.check_call([sys.executable, "-c",
@@ -109,6 +120,18 @@
             self.fail("Expected ValueError when stdout arg supplied.")
         self.assertIn('stdout', c.exception.args[0])
 
+    def test_check_output_timeout(self):
+        # check_output() function with timeout arg
+        with self.assertRaises(subprocess.TimeoutExpired) as c:
+            output = subprocess.check_output(
+                    [sys.executable, "-c",
+                     "import sys; sys.stdout.write('BDFL')\n"
+                     "sys.stdout.flush()\n"
+                     "while True: pass"],
+                    timeout=0.5)
+            self.fail("Expected TimeoutExpired.")
+        self.assertEqual(c.exception.output, b'BDFL')
+
     def test_call_kwargs(self):
         # call() function with keyword args
         newenv = os.environ.copy()
@@ -366,6 +389,41 @@
         self.assertEqual(stdout, b"banana")
         self.assertStderrEqual(stderr, b"pineapple")
 
+    def test_communicate_timeout(self):
+        p = subprocess.Popen([sys.executable, "-c",
+                              'import sys,os,time;'
+                              'sys.stderr.write("pineapple\\n");'
+                              'time.sleep(1);'
+                              'sys.stderr.write("pear\\n");'
+                              'sys.stdout.write(sys.stdin.read())'],
+                             universal_newlines=True,
+                             stdin=subprocess.PIPE,
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.PIPE)
+        self.assertRaises(subprocess.TimeoutExpired, p.communicate, "banana",
+                          timeout=0.3)
+        # Make sure we can keep waiting for it, and that we get the whole output
+        # after it completes.
+        (stdout, stderr) = p.communicate()
+        self.assertEqual(stdout, "banana")
+        self.assertStderrEqual(stderr.encode(), b"pineapple\npear\n")
+
+    def test_communicate_timeout_large_ouput(self):
+        # Test a expring timeout while the child is outputting lots of data.
+        p = subprocess.Popen([sys.executable, "-c",
+                              'import sys,os,time;'
+                              'sys.stdout.write("a" * (64 * 1024));'
+                              'time.sleep(0.2);'
+                              'sys.stdout.write("a" * (64 * 1024));'
+                              'time.sleep(0.2);'
+                              'sys.stdout.write("a" * (64 * 1024));'
+                              'time.sleep(0.2);'
+                              'sys.stdout.write("a" * (64 * 1024));'],
+                             stdout=subprocess.PIPE)
+        self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4)
+        (stdout, _) = p.communicate()
+        self.assertEqual(len(stdout), 4 * 64 * 1024)
+
     # Test for the fd leak reported in http://bugs.python.org/issue2791.
     def test_communicate_pipe_fd_leak(self):
         for stdin_pipe in (False, True):
@@ -561,6 +619,13 @@
         self.assertEqual(p.wait(), 0)
 
 
+    def test_wait_timeout(self):
+        p = subprocess.Popen([sys.executable,
+                              "-c", "import time; time.sleep(1)"])
+        self.assertRaises(subprocess.TimeoutExpired, p.wait, timeout=0.1)
+        self.assertEqual(p.wait(timeout=2), 0)
+
+
     def test_invalid_bufsize(self):
         # an invalid type of the bufsize argument should raise
         # TypeError.
diff --git a/PC/_subprocess.c b/PC/_subprocess.c
--- a/PC/_subprocess.c
+++ b/PC/_subprocess.c
@@ -682,6 +682,7 @@
     defint(d, "SW_HIDE", SW_HIDE);
     defint(d, "INFINITE", INFINITE);
     defint(d, "WAIT_OBJECT_0", WAIT_OBJECT_0);
+    defint(d, "WAIT_TIMEOUT", WAIT_TIMEOUT);
     defint(d, "CREATE_NEW_CONSOLE", CREATE_NEW_CONSOLE);
     defint(d, "CREATE_NEW_PROCESS_GROUP", CREATE_NEW_PROCESS_GROUP);
 

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list