[Python-checkins] cpython (merge 3.4 -> default): (Merge 3.4) asyncio: sync with Tulip

victor.stinner python-checkins at python.org
Mon Aug 25 23:23:45 CEST 2014


http://hg.python.org/cpython/rev/63cabfde945f
changeset:   92237:63cabfde945f
parent:      92235:eb2564a37a5c
parent:      92236:877c8442b992
user:        Victor Stinner <victor.stinner at gmail.com>
date:        Mon Aug 25 23:22:54 2014 +0200
summary:
  (Merge 3.4) asyncio: sync with Tulip

* PipeServer.close() now cancels the "accept pipe" future which cancels the
  overlapped operation.
* Fix _SelectorTransport.__repr__() if the transport was closed
* Fix debug log in BaseEventLoop.create_connection(): get the socket object
  from the transport because SSL transport closes the old socket and creates a
  new SSL socket object. Remove also the _SelectorSslTransport._rawsock
  attribute: it contained the closed socket (not very useful) and it was not
  used.
* Issue #22063: socket operations (sock_recv, sock_sendall, sock_connect,
  sock_accept) of the proactor event loop don't raise an exception in debug
  mode if the socket are in blocking mode. Overlapped operations also work on
  blocking sockets.
* Fix unit tests in debug mode: mock a non-blocking socket for socket
  operations which now raise an exception if the socket is blocking.
* _fatal_error() method of _UnixReadPipeTransport and _UnixWritePipeTransport
  now log all exceptions in debug mode
* Don't log expected errors in unit tests
* Tulip issue 200: _WaitHandleFuture._unregister_wait() now catchs and logs
  exceptions.
* Tulip issue 200: Log errors in debug mode instead of simply ignoring them.

files:
  Lib/asyncio/base_events.py                    |   7 +
  Lib/asyncio/proactor_events.py                |  24 +++---
  Lib/asyncio/selector_events.py                |  31 ++++----
  Lib/asyncio/test_utils.py                     |   6 +
  Lib/asyncio/unix_events.py                    |  14 +++-
  Lib/asyncio/windows_events.py                 |  31 ++++++-
  Lib/test/test_asyncio/test_base_events.py     |   3 +
  Lib/test/test_asyncio/test_events.py          |  36 +++++----
  Lib/test/test_asyncio/test_selector_events.py |  24 +++--
  Lib/test/test_asyncio/test_subprocess.py      |   8 +-
  10 files changed, 120 insertions(+), 64 deletions(-)


diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -578,6 +578,9 @@
         transport, protocol = yield from self._create_connection_transport(
             sock, protocol_factory, ssl, server_hostname)
         if self._debug:
+            # Get the socket from the transport because SSL transport closes
+            # the old socket and creates a new SSL socket
+            sock = transport.get_extra_info('socket')
             logger.debug("%r connected to %s:%r: (%r, %r)",
                          sock, host, port, transport, protocol)
         return transport, protocol
@@ -725,6 +728,10 @@
                         sock = socket.socket(af, socktype, proto)
                     except socket.error:
                         # Assume it's a bad family/type/protocol combination.
+                        if self._debug:
+                            logger.warning('create_server() failed to create '
+                                           'socket.socket(%r, %r, %r)',
+                                           af, socktype, proto, exc_info=True)
                         continue
                     sockets.append(sock)
                     if reuse_address:
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -172,6 +172,9 @@
         except ConnectionAbortedError as exc:
             if not self._closing:
                 self._fatal_error(exc, 'Fatal read error on pipe transport')
+            elif self._loop.get_debug():
+                logger.debug("Read error on pipe transport while closing",
+                             exc_info=True)
         except ConnectionResetError as exc:
             self._force_close(exc)
         except OSError as exc:
@@ -324,12 +327,16 @@
         try:
             self._extra['sockname'] = sock.getsockname()
         except (socket.error, AttributeError):
-            pass
+            if self._loop.get_debug():
+                logger.warning("getsockname() failed on %r",
+                             sock, exc_info=True)
         if 'peername' not in self._extra:
             try:
                 self._extra['peername'] = sock.getpeername()
             except (socket.error, AttributeError):
-                pass
+                if self._loop.get_debug():
+                    logger.warning("getpeername() failed on %r",
+                                   sock, exc_info=True)
 
     def can_write_eof(self):
         return True
@@ -385,18 +392,12 @@
         self._selector = None
 
     def sock_recv(self, sock, n):
-        if self.get_debug() and sock.gettimeout() != 0:
-            raise ValueError("the socket must be non-blocking")
         return self._proactor.recv(sock, n)
 
     def sock_sendall(self, sock, data):
-        if self.get_debug() and sock.gettimeout() != 0:
-            raise ValueError("the socket must be non-blocking")
         return self._proactor.send(sock, data)
 
     def sock_connect(self, sock, address):
-        if self.get_debug() and sock.gettimeout() != 0:
-            raise ValueError("the socket must be non-blocking")
         try:
             base_events._check_resolved_address(sock, address)
         except ValueError as err:
@@ -407,8 +408,6 @@
             return self._proactor.connect(sock, address)
 
     def sock_accept(self, sock):
-        if self.get_debug() and sock.gettimeout() != 0:
-            raise ValueError("the socket must be non-blocking")
         return self._proactor.accept(sock)
 
     def _socketpair(self):
@@ -470,11 +469,14 @@
             except OSError as exc:
                 if sock.fileno() != -1:
                     self.call_exception_handler({
-                        'message': 'Accept failed',
+                        'message': 'Accept failed on a socket',
                         'exception': exc,
                         'socket': sock,
                     })
                     sock.close()
+                elif self._debug:
+                    logger.debug("Accept failed on socket %r",
+                                 sock, exc_info=True)
             except futures.CancelledError:
                 sock.close()
             else:
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -450,22 +450,24 @@
 
     def __repr__(self):
         info = [self.__class__.__name__, 'fd=%s' % self._sock_fd]
-        polling = _test_selector_event(self._loop._selector,
-                                       self._sock_fd, selectors.EVENT_READ)
-        if polling:
-            info.append('read=polling')
-        else:
-            info.append('read=idle')
+        # test if the transport was closed
+        if self._loop is not None:
+            polling = _test_selector_event(self._loop._selector,
+                                           self._sock_fd, selectors.EVENT_READ)
+            if polling:
+                info.append('read=polling')
+            else:
+                info.append('read=idle')
 
-        polling = _test_selector_event(self._loop._selector,
-                                       self._sock_fd, selectors.EVENT_WRITE)
-        if polling:
-            state = 'polling'
-        else:
-            state = 'idle'
+            polling = _test_selector_event(self._loop._selector,
+                                           self._sock_fd, selectors.EVENT_WRITE)
+            if polling:
+                state = 'polling'
+            else:
+                state = 'idle'
 
-        bufsize = self.get_write_buffer_size()
-        info.append('write=<%s, bufsize=%s>' % (state, bufsize))
+            bufsize = self.get_write_buffer_size()
+            info.append('write=<%s, bufsize=%s>' % (state, bufsize))
         return '<%s>' % ' '.join(info)
 
     def abort(self):
@@ -689,7 +691,6 @@
 
         self._server_hostname = server_hostname
         self._waiter = waiter
-        self._rawsock = rawsock
         self._sslcontext = sslcontext
         self._paused = False
 
diff --git a/Lib/asyncio/test_utils.py b/Lib/asyncio/test_utils.py
--- a/Lib/asyncio/test_utils.py
+++ b/Lib/asyncio/test_utils.py
@@ -417,3 +417,9 @@
         yield
     finally:
         logger.setLevel(old_level)
+
+def mock_nonblocking_socket():
+    """Create a mock of a non-blocking socket."""
+    sock = mock.Mock(socket.socket)
+    sock.gettimeout.return_value = 0.0
+    return sock
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -336,7 +336,10 @@
 
     def _fatal_error(self, exc, message='Fatal error on pipe transport'):
         # should be called by exception handler only
-        if not (isinstance(exc, OSError) and exc.errno == errno.EIO):
+        if (isinstance(exc, OSError) and exc.errno == errno.EIO):
+            if self._loop.get_debug():
+                logger.debug("%r: %s", self, message, exc_info=True)
+        else:
             self._loop.call_exception_handler({
                 'message': message,
                 'exception': exc,
@@ -508,7 +511,10 @@
 
     def _fatal_error(self, exc, message='Fatal error on pipe transport'):
         # should be called by exception handler only
-        if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
+        if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
+            if self._loop.get_debug():
+                logger.debug("%r: %s", self, message, exc_info=True)
+        else:
             self._loop.call_exception_handler({
                 'message': message,
                 'exception': exc,
@@ -749,7 +755,9 @@
         except KeyError:  # pragma: no cover
             # May happen if .remove_child_handler() is called
             # after os.waitpid() returns.
-            pass
+            if self._loop.get_debug():
+                logger.warning("Child watcher got an unexpected pid: %r",
+                               pid, exc_info=True)
         else:
             callback(pid, returncode, *args)
 
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -111,10 +111,17 @@
             return
         try:
             _overlapped.UnregisterWait(self._wait_handle)
-        except OSError as e:
-            if e.winerror != _overlapped.ERROR_IO_PENDING:
-                raise
+        except OSError as exc:
             # ERROR_IO_PENDING is not an error, the wait was unregistered
+            if exc.winerror != _overlapped.ERROR_IO_PENDING:
+                context = {
+                    'message': 'Failed to unregister the wait handle',
+                    'exception': exc,
+                    'future': self,
+                }
+                if self._source_traceback:
+                    context['source_traceback'] = self._source_traceback
+                self._loop.call_exception_handler(context)
         self._wait_handle = None
         self._iocp = None
         self._ov = None
@@ -145,6 +152,11 @@
     def __init__(self, address):
         self._address = address
         self._free_instances = weakref.WeakSet()
+        # initialize the pipe attribute before calling _server_pipe_handle()
+        # because this function can raise an exception and the destructor calls
+        # the close() method
+        self._pipe = None
+        self._accept_pipe_future = None
         self._pipe = self._server_pipe_handle(True)
 
     def _get_unconnected_pipe(self):
@@ -174,6 +186,9 @@
         return pipe
 
     def close(self):
+        if self._accept_pipe_future is not None:
+            self._accept_pipe_future.cancel()
+            self._accept_pipe_future = None
         # Close all instances which have not been connected to by a client.
         if self._address is not None:
             for pipe in self._free_instances:
@@ -216,7 +231,7 @@
     def start_serving_pipe(self, protocol_factory, address):
         server = PipeServer(address)
 
-        def loop(f=None):
+        def loop_accept_pipe(f=None):
             pipe = None
             try:
                 if f:
@@ -237,13 +252,17 @@
                         'pipe': pipe,
                     })
                     pipe.close()
+                elif self._debug:
+                    logger.warning("Accept pipe failed on pipe %r",
+                                   pipe, exc_info=True)
             except futures.CancelledError:
                 if pipe:
                     pipe.close()
             else:
-                f.add_done_callback(loop)
+                server._accept_pipe_future = f
+                f.add_done_callback(loop_accept_pipe)
 
-        self.call_soon(loop)
+        self.call_soon(loop_accept_pipe)
         return [server]
 
     @coroutine
diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py
--- a/Lib/test/test_asyncio/test_base_events.py
+++ b/Lib/test/test_asyncio/test_base_events.py
@@ -792,6 +792,9 @@
         class _SelectorTransportMock:
             _sock = None
 
+            def get_extra_info(self, key):
+                return mock.Mock()
+
             def close(self):
                 self._sock.close()
 
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -27,6 +27,7 @@
 
 
 import asyncio
+from asyncio import proactor_events
 from asyncio import selector_events
 from asyncio import test_utils
 
@@ -383,22 +384,23 @@
         self.assertEqual(read, data)
 
     def _basetest_sock_client_ops(self, httpd, sock):
-        # in debug mode, socket operations must fail
-        # if the socket is not in blocking mode
-        self.loop.set_debug(True)
-        sock.setblocking(True)
-        with self.assertRaises(ValueError):
-            self.loop.run_until_complete(
-                self.loop.sock_connect(sock, httpd.address))
-        with self.assertRaises(ValueError):
-            self.loop.run_until_complete(
-                self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
-        with self.assertRaises(ValueError):
-            self.loop.run_until_complete(
-                self.loop.sock_recv(sock, 1024))
-        with self.assertRaises(ValueError):
-            self.loop.run_until_complete(
-                self.loop.sock_accept(sock))
+        if not isinstance(self.loop, proactor_events.BaseProactorEventLoop):
+            # in debug mode, socket operations must fail
+            # if the socket is not in blocking mode
+            self.loop.set_debug(True)
+            sock.setblocking(True)
+            with self.assertRaises(ValueError):
+                self.loop.run_until_complete(
+                    self.loop.sock_connect(sock, httpd.address))
+            with self.assertRaises(ValueError):
+                self.loop.run_until_complete(
+                    self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
+            with self.assertRaises(ValueError):
+                self.loop.run_until_complete(
+                    self.loop.sock_recv(sock, 1024))
+            with self.assertRaises(ValueError):
+                self.loop.run_until_complete(
+                    self.loop.sock_accept(sock))
 
         # test in non-blocking mode
         sock.setblocking(False)
@@ -1229,6 +1231,7 @@
                          "Don't support pipes for Windows")
     def test_write_pipe_disconnect_on_close(self):
         rsock, wsock = test_utils.socketpair()
+        rsock.setblocking(False)
         pipeobj = io.open(wsock.detach(), 'wb', 1024)
 
         proto = MyWritePipeProto(loop=self.loop)
@@ -1366,6 +1369,7 @@
             for sock_type in (socket.SOCK_STREAM, socket.SOCK_DGRAM):
                 sock = socket.socket(family, sock_type)
                 with sock:
+                    sock.setblocking(False)
                     connect = self.loop.sock_connect(sock, address)
                     with self.assertRaises(ValueError) as cm:
                         self.loop.run_until_complete(connect)
diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py
--- a/Lib/test/test_asyncio/test_selector_events.py
+++ b/Lib/test/test_asyncio/test_selector_events.py
@@ -58,8 +58,9 @@
         self.loop.remove_reader = mock.Mock()
         self.loop.remove_writer = mock.Mock()
         waiter = asyncio.Future(loop=self.loop)
-        transport = self.loop._make_ssl_transport(
-            m, asyncio.Protocol(), m, waiter)
+        with test_utils.disable_logger():
+            transport = self.loop._make_ssl_transport(
+                m, asyncio.Protocol(), m, waiter)
         self.assertIsInstance(transport, _SelectorSslTransport)
 
     @mock.patch('asyncio.selector_events.ssl', None)
@@ -127,7 +128,8 @@
 
     def test_write_to_self_tryagain(self):
         self.loop._csock.send.side_effect = BlockingIOError
-        self.assertIsNone(self.loop._write_to_self())
+        with test_utils.disable_logger():
+            self.assertIsNone(self.loop._write_to_self())
 
     def test_write_to_self_exception(self):
         # _write_to_self() swallows OSError
@@ -135,7 +137,7 @@
         self.assertRaises(RuntimeError, self.loop._write_to_self)
 
     def test_sock_recv(self):
-        sock = mock.Mock()
+        sock = test_utils.mock_nonblocking_socket()
         self.loop._sock_recv = mock.Mock()
 
         f = self.loop.sock_recv(sock, 1024)
@@ -183,7 +185,7 @@
         self.assertIs(err, f.exception())
 
     def test_sock_sendall(self):
-        sock = mock.Mock()
+        sock = test_utils.mock_nonblocking_socket()
         self.loop._sock_sendall = mock.Mock()
 
         f = self.loop.sock_sendall(sock, b'data')
@@ -193,7 +195,7 @@
             self.loop._sock_sendall.call_args[0])
 
     def test_sock_sendall_nodata(self):
-        sock = mock.Mock()
+        sock = test_utils.mock_nonblocking_socket()
         self.loop._sock_sendall = mock.Mock()
 
         f = self.loop.sock_sendall(sock, b'')
@@ -295,7 +297,7 @@
             self.loop.add_writer.call_args[0])
 
     def test_sock_connect(self):
-        sock = mock.Mock()
+        sock = test_utils.mock_nonblocking_socket()
         self.loop._sock_connect = mock.Mock()
 
         f = self.loop.sock_connect(sock, ('127.0.0.1', 8080))
@@ -361,7 +363,7 @@
         self.assertIsInstance(f.exception(), OSError)
 
     def test_sock_accept(self):
-        sock = mock.Mock()
+        sock = test_utils.mock_nonblocking_socket()
         self.loop._sock_accept = mock.Mock()
 
         f = self.loop.sock_accept(sock)
@@ -782,7 +784,8 @@
         transport = _SelectorSocketTransport(
             self.loop, self.sock, self.protocol)
         transport._force_close = mock.Mock()
-        transport._read_ready()
+        with test_utils.disable_logger():
+            transport._read_ready()
         transport._force_close.assert_called_with(err)
 
     @mock.patch('logging.exception')
@@ -1219,7 +1222,8 @@
         err = self.sslsock.recv.side_effect = ConnectionResetError()
         transport = self._make_one()
         transport._force_close = mock.Mock()
-        transport._read_ready()
+        with test_utils.disable_logger():
+            transport._read_ready()
         transport._force_close.assert_called_with(err)
 
     def test_read_ready_recv_retry(self):
diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py
--- a/Lib/test/test_asyncio/test_subprocess.py
+++ b/Lib/test/test_asyncio/test_subprocess.py
@@ -148,15 +148,17 @@
 
         coro = write_stdin(proc, large_data)
         # drain() must raise BrokenPipeError or ConnectionResetError
-        self.assertRaises((BrokenPipeError, ConnectionResetError),
-                          self.loop.run_until_complete, coro)
+        with test_utils.disable_logger():
+            self.assertRaises((BrokenPipeError, ConnectionResetError),
+                              self.loop.run_until_complete, coro)
         self.loop.run_until_complete(proc.wait())
 
     def test_communicate_ignore_broken_pipe(self):
         proc, large_data = self.prepare_broken_pipe_test()
 
         # communicate() must ignore BrokenPipeError when feeding stdin
-        self.loop.run_until_complete(proc.communicate(large_data))
+        with test_utils.disable_logger():
+            self.loop.run_until_complete(proc.communicate(large_data))
         self.loop.run_until_complete(proc.wait())
 
 

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list