[Python-checkins] cpython (3.4): asyncio: sync with Tulip

victor.stinner python-checkins at python.org
Mon Jul 14 18:37:30 CEST 2014


http://hg.python.org/cpython/rev/3027bf42e29c
changeset:   91677:3027bf42e29c
branch:      3.4
parent:      91675:b57b4e3b83ff
user:        Victor Stinner <victor.stinner at gmail.com>
date:        Mon Jul 14 18:33:40 2014 +0200
summary:
  asyncio: sync with Tulip

* Tulip issue #184: Log subprocess events in debug mode

  - Log stdin, stdout and stderr transports and protocols
  - Log process identifier (pid)
  - Log connection of pipes
  - Log process exit
  - Log Process.communicate() tasks: feed stdin, read stdout and stderr
  - Add __repr__() method to many classes related to subprocesses


* Add BaseSubprocessTransport._pid attribute. Store the pid so it is still
  accessible after the process exited. It's more convinient for debug.

* create_connection(): add the socket in the "connected to" debug log

* Clean up some docstrings and comments. Remove unused unimplemented
  _read_from_self().

files:
  Lib/asyncio/base_events.py                |  131 +++++++--
  Lib/asyncio/base_subprocess.py            |   40 ++-
  Lib/asyncio/streams.py                    |   12 +
  Lib/asyncio/subprocess.py                 |   26 +
  Lib/asyncio/unix_events.py                |   13 +-
  Lib/test/test_asyncio/test_base_events.py |    2 -
  6 files changed, 181 insertions(+), 43 deletions(-)


diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -1,7 +1,7 @@
 """Base implementation of event loop.
 
 The event loop can be broken up into a multiplexer (the part
-responsible for notifying us of IO events) and the event loop proper,
+responsible for notifying us of I/O events) and the event loop proper,
 which wraps a multiplexer with functionality for scheduling callbacks,
 immediately or at a given time in the future.
 
@@ -50,6 +50,15 @@
         return str(handle)
 
 
+def _format_pipe(fd):
+    if fd == subprocess.PIPE:
+        return '<pipe>'
+    elif fd == subprocess.STDOUT:
+        return '<stdout>'
+    else:
+        return repr(fd)
+
+
 class _StopError(BaseException):
     """Raised to stop the event loop."""
 
@@ -70,7 +79,7 @@
         type_mask |= socket.SOCK_NONBLOCK
     if hasattr(socket, 'SOCK_CLOEXEC'):
         type_mask |= socket.SOCK_CLOEXEC
-    # Use getaddrinfo(AI_NUMERICHOST) to ensure that the address is
+    # Use getaddrinfo(flags=AI_NUMERICHOST) to ensure that the address is
     # already resolved.
     try:
         socket.getaddrinfo(host, port,
@@ -158,7 +167,8 @@
     def create_task(self, coro):
         """Schedule a coroutine object.
 
-        Return a task object."""
+        Return a task object.
+        """
         task = tasks.Task(coro, loop=self)
         if task._source_traceback:
             del task._source_traceback[-1]
@@ -197,12 +207,13 @@
         """Create subprocess transport."""
         raise NotImplementedError
 
-    def _read_from_self(self):
-        """XXX"""
-        raise NotImplementedError
+    def _write_to_self(self):
+        """Write a byte to self-pipe, to wake up the event loop.
 
-    def _write_to_self(self):
-        """XXX"""
+        This may be called from a different thread.
+
+        The subclass is responsible for implementing the self-pipe.
+        """
         raise NotImplementedError
 
     def _process_events(self, event_list):
@@ -233,7 +244,7 @@
 
         If the argument is a coroutine, it is wrapped in a Task.
 
-        XXX TBD: It would be disastrous to call run_until_complete()
+        WARNING: It would be disastrous to call run_until_complete()
         with the same coroutine twice -- it would wrap it in two
         different Tasks and that can't be good.
 
@@ -261,7 +272,7 @@
 
         Every callback scheduled before stop() is called will run.
         Callback scheduled after stop() is called won't.  However,
-        those callbacks will run if run() is called again later.
+        those callbacks will run if run_*() is called again later.
         """
         self.call_soon(_raise_stop_error)
 
@@ -274,7 +285,7 @@
         The event loop must not be running.
         """
         if self._running:
-            raise RuntimeError("cannot close a running event loop")
+            raise RuntimeError("Cannot close a running event loop")
         if self._closed:
             return
         if self._debug:
@@ -292,11 +303,16 @@
         return self._closed
 
     def is_running(self):
-        """Returns running status of event loop."""
+        """Returns True if the event loop is running."""
         return self._running
 
     def time(self):
-        """Return the time according to the event loop's clock."""
+        """Return the time according to the event loop's clock.
+
+        This is a float expressed in seconds since an epoch, but the
+        epoch, precision, accuracy and drift are unspecified and may
+        differ per event loop.
+        """
         return time.monotonic()
 
     def call_later(self, delay, callback, *args):
@@ -306,7 +322,7 @@
         can be used to cancel the call.
 
         The delay can be an int or float, expressed in seconds.  It is
-        always a relative time.
+        always relative to the current time.
 
         Each callback will be called exactly once.  If two callbacks
         are scheduled for exactly the same time, it undefined which
@@ -321,7 +337,10 @@
         return timer
 
     def call_at(self, when, callback, *args):
-        """Like call_later(), but uses an absolute time."""
+        """Like call_later(), but uses an absolute time.
+
+        Absolute time corresponds to the event loop's time() method.
+        """
         if coroutines.iscoroutinefunction(callback):
             raise TypeError("coroutines cannot be used with call_at()")
         if self._debug:
@@ -335,7 +354,7 @@
     def call_soon(self, callback, *args):
         """Arrange for a callback to be called as soon as possible.
 
-        This operates as a FIFO queue, callbacks are called in the
+        This operates as a FIFO queue: callbacks are called in the
         order in which they are registered.  Each callback will be
         called exactly once.
 
@@ -361,10 +380,10 @@
     def _assert_is_current_event_loop(self):
         """Asserts that this event loop is the current event loop.
 
-        Non-threadsafe methods of this class make this assumption and will
+        Non-thread-safe methods of this class make this assumption and will
         likely behave incorrectly when the assumption is violated.
 
-        Should only be called when (self._debug == True). The caller is
+        Should only be called when (self._debug == True).  The caller is
         responsible for checking this condition for performance reasons.
         """
         try:
@@ -373,11 +392,11 @@
             return
         if current is not self:
             raise RuntimeError(
-                "non-threadsafe operation invoked on an event loop other "
+                "Non-thread-safe operation invoked on an event loop other "
                 "than the current one")
 
     def call_soon_threadsafe(self, callback, *args):
-        """Like call_soon(), but thread safe."""
+        """Like call_soon(), but thread-safe."""
         handle = self._call_soon(callback, args, check_loop=False)
         if handle._source_traceback:
             del handle._source_traceback[-1]
@@ -386,7 +405,7 @@
 
     def run_in_executor(self, executor, callback, *args):
         if coroutines.iscoroutinefunction(callback):
-            raise TypeError("coroutines cannot be used with run_in_executor()")
+            raise TypeError("Coroutines cannot be used with run_in_executor()")
         if isinstance(callback, events.Handle):
             assert not args
             assert not isinstance(callback, events.TimerHandle)
@@ -416,13 +435,13 @@
         if flags:
             msg.append('flags=%r' % flags)
         msg = ', '.join(msg)
-        logger.debug('Get addresss info %s', msg)
+        logger.debug('Get address info %s', msg)
 
         t0 = self.time()
         addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
         dt = self.time() - t0
 
-        msg = ('Getting addresss info %s took %.3f ms: %r'
+        msg = ('Getting address info %s took %.3f ms: %r'
                % (msg, dt * 1e3, addrinfo))
         if dt >= self.slow_callback_duration:
             logger.info(msg)
@@ -559,8 +578,8 @@
         transport, protocol = yield from self._create_connection_transport(
             sock, protocol_factory, ssl, server_hostname)
         if self._debug:
-            logger.debug("connected to %s:%r: (%r, %r)",
-                         host, port, transport, protocol)
+            logger.debug("%r connected to %s:%r: (%r, %r)",
+                         sock, host, port, transport, protocol)
         return transport, protocol
 
     @coroutine
@@ -589,7 +608,7 @@
                 raise ValueError('unexpected address family')
             addr_pairs_info = (((family, proto), (None, None)),)
         else:
-            # join addresss by (family, protocol)
+            # join address by (family, protocol)
             addr_infos = collections.OrderedDict()
             for idx, addr in ((0, local_addr), (1, remote_addr)):
                 if addr is not None:
@@ -674,7 +693,7 @@
                       reuse_address=None):
         """Create a TCP server bound to host and port.
 
-        Return an Server object which can be used to stop the service.
+        Return a Server object which can be used to stop the service.
 
         This method is a coroutine.
         """
@@ -731,8 +750,7 @@
                         sock.close()
         else:
             if sock is None:
-                raise ValueError(
-                    'host and port was not specified and no sock specified')
+                raise ValueError('Neither host/port nor sock were specified')
             sockets = [sock]
 
         server = Server(self, sockets)
@@ -750,6 +768,9 @@
         waiter = futures.Future(loop=self)
         transport = self._make_read_pipe_transport(pipe, protocol, waiter)
         yield from waiter
+        if self._debug:
+            logger.debug('Read pipe %r connected: (%r, %r)',
+                         pipe.fileno(), transport, protocol)
         return transport, protocol
 
     @coroutine
@@ -758,8 +779,24 @@
         waiter = futures.Future(loop=self)
         transport = self._make_write_pipe_transport(pipe, protocol, waiter)
         yield from waiter
+        if self._debug:
+            logger.debug('Write pipe %r connected: (%r, %r)',
+                         pipe.fileno(), transport, protocol)
         return transport, protocol
 
+    def _log_subprocess(self, msg, stdin, stdout, stderr):
+        info = [msg]
+        if stdin is not None:
+            info.append('stdin=%s' % _format_pipe(stdin))
+        if stdout is not None and stderr == subprocess.STDOUT:
+            info.append('stdout=stderr=%s' % _format_pipe(stdout))
+        else:
+            if stdout is not None:
+                info.append('stdout=%s' % _format_pipe(stdout))
+            if stderr is not None:
+                info.append('stderr=%s' % _format_pipe(stderr))
+        logger.debug(' '.join(info))
+
     @coroutine
     def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
                          stdout=subprocess.PIPE, stderr=subprocess.PIPE,
@@ -774,8 +811,15 @@
         if bufsize != 0:
             raise ValueError("bufsize must be 0")
         protocol = protocol_factory()
+        if self._debug:
+            # don't log parameters: they may contain sensitive information
+            # (password) and may be too long
+            debug_log = 'run shell command %r' % cmd
+            self._log_subprocess(debug_log, stdin, stdout, stderr)
         transport = yield from self._make_subprocess_transport(
             protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
+        if self._debug:
+            logger.info('%s: %r' % (debug_log, transport))
         return transport, protocol
 
     @coroutine
@@ -796,9 +840,16 @@
                                 "a bytes or text string, not %s"
                                 % type(arg).__name__)
         protocol = protocol_factory()
+        if self._debug:
+            # don't log parameters: they may contain sensitive information
+            # (password) and may be too long
+            debug_log = 'execute program %r' % program
+            self._log_subprocess(debug_log, stdin, stdout, stderr)
         transport = yield from self._make_subprocess_transport(
             protocol, popen_args, False, stdin, stdout, stderr,
             bufsize, **kwargs)
+        if self._debug:
+            logger.info('%s: %r' % (debug_log, transport))
         return transport, protocol
 
     def set_exception_handler(self, handler):
@@ -808,7 +859,7 @@
         be set.
 
         If handler is a callable object, it should have a
-        matching signature to '(loop, context)', where 'loop'
+        signature matching '(loop, context)', where 'loop'
         will be a reference to the active event loop, 'context'
         will be a dict object (see `call_exception_handler()`
         documentation for details about context).
@@ -825,7 +876,7 @@
         handler is set, and can be called by a custom exception
         handler that wants to defer to the default behavior.
 
-        context parameter has the same meaning as in
+        The context parameter has the same meaning as in
         `call_exception_handler()`.
         """
         message = context.get('message')
@@ -854,10 +905,10 @@
         logger.error('\n'.join(log_lines), exc_info=exc_info)
 
     def call_exception_handler(self, context):
-        """Call the current event loop exception handler.
+        """Call the current event loop's exception handler.
 
-        context is a dict object containing the following keys
-        (new keys maybe introduced later):
+        The context argument is a dict containing the following keys:
+
         - 'message': Error message;
         - 'exception' (optional): Exception object;
         - 'future' (optional): Future instance;
@@ -866,8 +917,10 @@
         - 'transport' (optional): Transport instance;
         - 'socket' (optional): Socket instance.
 
-        Note: this method should not be overloaded in subclassed
-        event loops.  For any custom exception handling, use
+        New keys maybe introduced in the future.
+
+        Note: do not overload this method in an event loop subclass.
+        For custom exception handling, use the
         `set_exception_handler()` method.
         """
         if self._exception_handler is None:
@@ -892,7 +945,7 @@
                         'context': context,
                     })
                 except Exception:
-                    # Guard 'default_exception_handler' in case it's
+                    # Guard 'default_exception_handler' in case it is
                     # overloaded.
                     logger.error('Exception in default exception handler '
                                  'while handling an unexpected error '
@@ -900,7 +953,7 @@
                                  exc_info=True)
 
     def _add_callback(self, handle):
-        """Add a Handle to ready or scheduled."""
+        """Add a Handle to _scheduled (TimerHandle) or _ready."""
         assert isinstance(handle, events.Handle), 'A Handle is required here'
         if handle._cancelled:
             return
@@ -971,7 +1024,7 @@
         # Note: We run all currently scheduled callbacks, but not any
         # callbacks scheduled by callbacks run this time around --
         # they will be run the next time (after another I/O poll).
-        # Use an idiom that is threadsafe without using locks.
+        # Use an idiom that is thread-safe without using locks.
         ntodo = len(self._ready)
         for i in range(ntodo):
             handle = self._ready.popleft()
diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py
--- a/Lib/asyncio/base_subprocess.py
+++ b/Lib/asyncio/base_subprocess.py
@@ -4,6 +4,7 @@
 from . import protocols
 from . import transports
 from .coroutines import coroutine
+from .log import logger
 
 
 class BaseSubprocessTransport(transports.SubprocessTransport):
@@ -14,6 +15,7 @@
         super().__init__(extra)
         self._protocol = protocol
         self._loop = loop
+        self._pid = None
 
         self._pipes = {}
         if stdin == subprocess.PIPE:
@@ -27,7 +29,36 @@
         self._returncode = None
         self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
                     stderr=stderr, bufsize=bufsize, **kwargs)
+        self._pid = self._proc.pid
         self._extra['subprocess'] = self._proc
+        if self._loop.get_debug():
+            if isinstance(args, (bytes, str)):
+                program = args
+            else:
+                program = args[0]
+            logger.debug('process %r created: pid %s',
+                         program, self._pid)
+
+    def __repr__(self):
+        info = [self.__class__.__name__, 'pid=%s' % self._pid]
+        if self._returncode is not None:
+            info.append('returncode=%s' % self._returncode)
+
+        stdin = self._pipes.get(0)
+        if stdin is not None:
+            info.append('stdin=%s' % stdin.pipe)
+
+        stdout = self._pipes.get(1)
+        stderr = self._pipes.get(2)
+        if stdout is not None and stderr is stdout:
+            info.append('stdout=stderr=%s' % stdout.pipe)
+        else:
+            if stdout is not None:
+                info.append('stdout=%s' % stdout.pipe)
+            if stderr is not None:
+                info.append('stderr=%s' % stderr.pipe)
+
+        return '<%s>' % ' '.join(info)
 
     def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
         raise NotImplementedError
@@ -45,7 +76,7 @@
             self.terminate()
 
     def get_pid(self):
-        return self._proc.pid
+        return self._pid
 
     def get_returncode(self):
         return self._returncode
@@ -108,6 +139,9 @@
     def _process_exited(self, returncode):
         assert returncode is not None, returncode
         assert self._returncode is None, self._returncode
+        if self._loop.get_debug():
+            logger.info('%r exited with return code %r',
+                        self, returncode)
         self._returncode = returncode
         self._call(self._protocol.process_exited)
         self._try_finish()
@@ -141,6 +175,10 @@
     def connection_made(self, transport):
         self.pipe = transport
 
+    def __repr__(self):
+        return ('<%s fd=%s pipe=%r>'
+                % (self.__class__.__name__, self.fd, self.pipe))
+
     def connection_lost(self, exc):
         self.disconnected = True
         self.proc._pipe_connection_lost(self.fd, exc)
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -15,6 +15,7 @@
 from . import futures
 from . import protocols
 from .coroutines import coroutine
+from .log import logger
 
 
 _DEFAULT_LIMIT = 2**16
@@ -153,10 +154,15 @@
     def pause_writing(self):
         assert not self._paused
         self._paused = True
+        if self._loop.get_debug():
+            logger.debug("%r pauses writing", self)
 
     def resume_writing(self):
         assert self._paused
         self._paused = False
+        if self._loop.get_debug():
+            logger.debug("%r resumes writing", self)
+
         waiter = self._drain_waiter
         if waiter is not None:
             self._drain_waiter = None
@@ -244,6 +250,12 @@
         self._reader = reader
         self._loop = loop
 
+    def __repr__(self):
+        info = [self.__class__.__name__, 'transport=%r' % self._transport]
+        if self._reader is not None:
+            info.append('reader=%r' % self._reader)
+        return '<%s>' % ' '.join(info)
+
     @property
     def transport(self):
         return self._transport
diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py
--- a/Lib/asyncio/subprocess.py
+++ b/Lib/asyncio/subprocess.py
@@ -9,6 +9,7 @@
 from . import streams
 from . import tasks
 from .coroutines import coroutine
+from .log import logger
 
 
 PIPE = subprocess.PIPE
@@ -28,6 +29,16 @@
         self._waiters = collections.deque()
         self._transport = None
 
+    def __repr__(self):
+        info = [self.__class__.__name__]
+        if self.stdin is not None:
+            info.append('stdin=%r' % self.stdin)
+        if self.stdout is not None:
+            info.append('stdout=%r' % self.stdout)
+        if self.stderr is not None:
+            info.append('stderr=%r' % self.stderr)
+        return '<%s>' % ' '.join(info)
+
     def connection_made(self, transport):
         self._transport = transport
         if transport.get_pipe_transport(1):
@@ -91,6 +102,9 @@
         self.stderr = protocol.stderr
         self.pid = transport.get_pid()
 
+    def __repr__(self):
+        return '<%s %s>' % (self.__class__.__name__, self.pid)
+
     @property
     def returncode(self):
         return self._transport.get_returncode()
@@ -126,7 +140,13 @@
     @coroutine
     def _feed_stdin(self, input):
         self.stdin.write(input)
+        if self._loop.get_debug():
+            logger.debug('%r communicate: feed stdin (%s bytes)',
+                        self, len(input))
         yield from self.stdin.drain()
+
+        if self._loop.get_debug():
+            logger.debug('%r communicate: close stdin', self)
         self.stdin.close()
 
     @coroutine
@@ -141,7 +161,13 @@
         else:
             assert fd == 1
             stream = self.stdout
+        if self._loop.get_debug():
+            name = 'stdout' if fd == 1 else 'stderr'
+            logger.debug('%r communicate: read %s', self, name)
         output = yield from stream.read()
+        if self._loop.get_debug():
+            name = 'stdout' if fd == 1 else 'stderr'
+            logger.debug('%r communicate: close %s', self, name)
         transport.close()
         return output
 
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -565,7 +565,7 @@
         process 'pid' terminates. Specifying another callback for the same
         process replaces the previous handler.
 
-        Note: callback() must be thread-safe
+        Note: callback() must be thread-safe.
         """
         raise NotImplementedError()
 
@@ -721,6 +721,9 @@
                 return
 
             returncode = self._compute_returncode(status)
+            if self._loop.get_debug():
+                logger.debug('process %s exited with returncode %s',
+                             expected_pid, returncode)
 
         try:
             callback, args = self._callbacks.pop(pid)
@@ -818,8 +821,16 @@
                     if self._forks:
                         # It may not be registered yet.
                         self._zombies[pid] = returncode
+                        if self._loop.get_debug():
+                            logger.debug('unknown process %s exited '
+                                         'with returncode %s',
+                                         pid, returncode)
                         continue
                     callback = None
+                else:
+                    if self._loop.get_debug():
+                        logger.debug('process %s exited with returncode %s',
+                                     pid, returncode)
 
             if callback is None:
                 logger.warning(
diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py
--- a/Lib/test/test_asyncio/test_base_events.py
+++ b/Lib/test/test_asyncio/test_base_events.py
@@ -44,8 +44,6 @@
         self.assertRaises(
             NotImplementedError, self.loop._write_to_self)
         self.assertRaises(
-            NotImplementedError, self.loop._read_from_self)
-        self.assertRaises(
             NotImplementedError,
             self.loop._make_read_pipe_transport, m, m)
         self.assertRaises(

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list