[Python-checkins] cpython (merge default -> default): Merge heads

serhiy.storchaka python-checkins at python.org
Sat Nov 2 09:48:41 CET 2013


http://hg.python.org/cpython/rev/e92bba5b53db
changeset:   86837:e92bba5b53db
parent:      86835:ab7c2c1d349c
parent:      86833:123804a72a8f
user:        Serhiy Storchaka <storchaka at gmail.com>
date:        Sat Nov 02 10:47:57 2013 +0200
summary:
  Merge heads

files:
  Lib/asyncio/base_events.py                    |   30 +-
  Lib/asyncio/constants.py                      |    5 +-
  Lib/asyncio/events.py                         |   15 +-
  Lib/asyncio/selector_events.py                |  154 ++++++---
  Lib/asyncio/tasks.py                          |    5 +-
  Lib/asyncio/windows_events.py                 |   16 +
  Lib/asyncio/windows_utils.py                  |   20 +-
  Lib/test/test_asyncio/test_base_events.py     |   78 ++++-
  Lib/test/test_asyncio/test_events.py          |    3 +-
  Lib/test/test_asyncio/test_selector_events.py |  154 ++++++---
  Lib/test/test_asyncio/test_windows_events.py  |    2 +-
  11 files changed, 350 insertions(+), 132 deletions(-)


diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -186,6 +186,11 @@
         self.call_soon(_raise_stop_error)
 
     def close(self):
+        """Close the event loop.
+
+        This clears the queues and shuts down the executor,
+        but does not wait for the executor to finish.
+        """
         self._ready.clear()
         self._scheduled.clear()
         executor = self._default_executor
@@ -275,8 +280,27 @@
     @tasks.coroutine
     def create_connection(self, protocol_factory, host=None, port=None, *,
                           ssl=None, family=0, proto=0, flags=0, sock=None,
-                          local_addr=None):
+                          local_addr=None, server_hostname=None):
         """XXX"""
+        if server_hostname is not None and not ssl:
+            raise ValueError('server_hostname is only meaningful with ssl')
+
+        if server_hostname is None and ssl:
+            # Use host as default for server_hostname.  It is an error
+            # if host is empty or not set, e.g. when an
+            # already-connected socket was passed or when only a port
+            # is given.  To avoid this error, you can pass
+            # server_hostname='' -- this will bypass the hostname
+            # check.  (This also means that if host is a numeric
+            # IP/IPv6 address, we will attempt to verify that exact
+            # address; this will probably fail, but it is possible to
+            # create a certificate for a specific IP address, so we
+            # don't judge it here.)
+            if not host:
+                raise ValueError('You must set server_hostname '
+                                 'when using ssl without a host')
+            server_hostname = host
+
         if host is not None or port is not None:
             if sock is not None:
                 raise ValueError(
@@ -357,7 +381,7 @@
             sslcontext = None if isinstance(ssl, bool) else ssl
             transport = self._make_ssl_transport(
                 sock, protocol, sslcontext, waiter,
-                server_side=False, server_hostname=host)
+                server_side=False, server_hostname=server_hostname)
         else:
             transport = self._make_socket_transport(sock, protocol, waiter)
 
@@ -442,6 +466,8 @@
                       ssl=None,
                       reuse_address=None):
         """XXX"""
+        if isinstance(ssl, bool):
+            raise TypeError('ssl argument must be an SSLContext or None')
         if host is not None or port is not None:
             if sock is not None:
                 raise ValueError(
diff --git a/Lib/asyncio/constants.py b/Lib/asyncio/constants.py
--- a/Lib/asyncio/constants.py
+++ b/Lib/asyncio/constants.py
@@ -1,4 +1,7 @@
 """Constants."""
 
+# After the connection is lost, log warnings after this many write()s.
+LOG_THRESHOLD_FOR_CONNLOST_WRITES = 5
 
-LOG_THRESHOLD_FOR_CONNLOST_WRITES = 5
+# Seconds to wait before retrying accept().
+ACCEPT_RETRY_DELAY = 1
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -137,6 +137,17 @@
         """Return whether the event loop is currently running."""
         raise NotImplementedError
 
+    def close(self):
+        """Close the loop.
+
+        The loop should not be running.
+
+        This is idempotent and irreversible.
+
+        No other methods should be called after this one.
+        """
+        raise NotImplementedError
+
     # Methods scheduling callbacks.  All these return Handles.
 
     def call_soon(self, callback, *args):
@@ -172,7 +183,7 @@
 
     def create_connection(self, protocol_factory, host=None, port=None, *,
                           ssl=None, family=0, proto=0, flags=0, sock=None,
-                          local_addr=None):
+                          local_addr=None, server_hostname=None):
         raise NotImplementedError
 
     def create_server(self, protocol_factory, host=None, port=None, *,
@@ -214,6 +225,8 @@
                                  family=0, proto=0, flags=0):
         raise NotImplementedError
 
+    # Pipes and subprocesses.
+
     def connect_read_pipe(self, protocol_factory, pipe):
         """Register read pipe in eventloop.
 
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -5,6 +5,7 @@
 """
 
 import collections
+import errno
 import socket
 try:
     import ssl
@@ -89,28 +90,37 @@
         except (BlockingIOError, InterruptedError):
             pass
 
-    def _start_serving(self, protocol_factory, sock, ssl=None, server=None):
+    def _start_serving(self, protocol_factory, sock,
+                       sslcontext=None, server=None):
         self.add_reader(sock.fileno(), self._accept_connection,
-                        protocol_factory, sock, ssl, server)
+                        protocol_factory, sock, sslcontext, server)
 
-    def _accept_connection(self, protocol_factory, sock, ssl=None,
-                           server=None):
+    def _accept_connection(self, protocol_factory, sock,
+                           sslcontext=None, server=None):
         try:
             conn, addr = sock.accept()
             conn.setblocking(False)
-        except (BlockingIOError, InterruptedError):
+        except (BlockingIOError, InterruptedError, ConnectionAbortedError):
             pass  # False alarm.
-        except Exception:
-            # Bad error. Stop serving.
-            self.remove_reader(sock.fileno())
-            sock.close()
+        except OSError as exc:
             # There's nowhere to send the error, so just log it.
             # TODO: Someone will want an error handler for this.
-            logger.exception('Accept failed')
+            if exc.errno in (errno.EMFILE, errno.ENFILE,
+                             errno.ENOBUFS, errno.ENOMEM):
+                # Some platforms (e.g. Linux keep reporting the FD as
+                # ready, so we remove the read handler temporarily.
+                # We'll try again in a while.
+                logger.exception('Accept out of system resource (%s)', exc)
+                self.remove_reader(sock.fileno())
+                self.call_later(constants.ACCEPT_RETRY_DELAY,
+                                self._start_serving,
+                                protocol_factory, sock, sslcontext, server)
+            else:
+                raise  # The event loop will catch, log and ignore it.
         else:
-            if ssl:
+            if sslcontext:
                 self._make_ssl_transport(
-                    conn, protocol_factory(), ssl, None,
+                    conn, protocol_factory(), sslcontext, None,
                     server_side=True, extra={'peername': addr}, server=server)
             else:
                 self._make_socket_transport(
@@ -277,7 +287,7 @@
                 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
                 if err != 0:
                     # Jump to the except clause below.
-                    raise OSError(err, 'Connect call failed')
+                    raise OSError(err, 'Connect call failed %s' % (address,))
         except (BlockingIOError, InterruptedError):
             self.add_writer(fd, self._sock_connect, fut, True, sock, address)
         except Exception as exc:
@@ -404,15 +414,16 @@
             try:
                 self._protocol.pause_writing()
             except Exception:
-                tulip_log.exception('pause_writing() failed')
+                logger.exception('pause_writing() failed')
 
     def _maybe_resume_protocol(self):
-        if self._protocol_paused and self.get_write_buffer_size() <= self._low_water:
+        if (self._protocol_paused and
+            self.get_write_buffer_size() <= self._low_water):
             self._protocol_paused = False
             try:
                 self._protocol.resume_writing()
             except Exception:
-                tulip_log.exception('resume_writing() failed')
+                logger.exception('resume_writing() failed')
 
     def set_write_buffer_limits(self, high=None, low=None):
         if high is None:
@@ -548,22 +559,28 @@
     def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None,
                  server_side=False, server_hostname=None,
                  extra=None, server=None):
+        if ssl is None:
+            raise RuntimeError('stdlib ssl module not available')
+
         if server_side:
-            assert isinstance(
-                sslcontext, ssl.SSLContext), 'Must pass an SSLContext'
+            if not sslcontext:
+                raise ValueError('Server side ssl needs a valid SSLContext')
         else:
-            # Client-side may pass ssl=True to use a default context.
-            # The default is the same as used by urllib.
-            if sslcontext is None:
+            if not sslcontext:
+                # Client side may pass ssl=True to use a default
+                # context; in that case the sslcontext passed is None.
+                # The default is the same as used by urllib with
+                # cadefault=True.
                 sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
                 sslcontext.options |= ssl.OP_NO_SSLv2
                 sslcontext.set_default_verify_paths()
                 sslcontext.verify_mode = ssl.CERT_REQUIRED
+
         wrap_kwargs = {
             'server_side': server_side,
             'do_handshake_on_connect': False,
         }
-        if server_hostname is not None and not server_side and ssl.HAS_SNI:
+        if server_hostname and not server_side and ssl.HAS_SNI:
             wrap_kwargs['server_hostname'] = server_hostname
         sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs)
 
@@ -609,7 +626,7 @@
 
         # Verify hostname if requested.
         peercert = self._sock.getpeercert()
-        if (self._server_hostname is not None and
+        if (self._server_hostname and
             self._sslcontext.verify_mode != ssl.CERT_NONE):
             try:
                 ssl.match_hostname(peercert, self._server_hostname)
@@ -625,15 +642,16 @@
                            compression=self._sock.compression(),
                            )
 
-        self._loop.add_reader(self._sock_fd, self._on_ready)
-        self._loop.add_writer(self._sock_fd, self._on_ready)
+        self._read_wants_write = False
+        self._write_wants_read = False
+        self._loop.add_reader(self._sock_fd, self._read_ready)
         self._loop.call_soon(self._protocol.connection_made, self)
         if self._waiter is not None:
             self._loop.call_soon(self._waiter.set_result, None)
 
     def pause_reading(self):
         # XXX This is a bit icky, given the comment at the top of
-        # _on_ready().  Is it possible to evoke a deadlock?  I don't
+        # _read_ready().  Is it possible to evoke a deadlock?  I don't
         # know, although it doesn't look like it; write() will still
         # accept more data for the buffer and eventually the app will
         # call resume_reading() again, and things will flow again.
@@ -648,41 +666,58 @@
         self._paused = False
         if self._closing:
             return
-        self._loop.add_reader(self._sock_fd, self._on_ready)
+        self._loop.add_reader(self._sock_fd, self._read_ready)
 
-    def _on_ready(self):
-        # Because of renegotiations (?), there's no difference between
-        # readable and writable.  We just try both.  XXX This may be
-        # incorrect; we probably need to keep state about what we
-        # should do next.
+    def _read_ready(self):
+        if self._write_wants_read:
+            self._write_wants_read = False
+            self._write_ready()
 
-        # First try reading.
-        if not self._closing and not self._paused:
-            try:
-                data = self._sock.recv(self.max_size)
-            except (BlockingIOError, InterruptedError,
-                    ssl.SSLWantReadError, ssl.SSLWantWriteError):
-                pass
-            except Exception as exc:
-                self._fatal_error(exc)
+            if self._buffer:
+                self._loop.add_writer(self._sock_fd, self._write_ready)
+
+        try:
+            data = self._sock.recv(self.max_size)
+        except (BlockingIOError, InterruptedError, ssl.SSLWantReadError):
+            pass
+        except ssl.SSLWantWriteError:
+            self._read_wants_write = True
+            self._loop.remove_reader(self._sock_fd)
+            self._loop.add_writer(self._sock_fd, self._write_ready)
+        except Exception as exc:
+            self._fatal_error(exc)
+        else:
+            if data:
+                self._protocol.data_received(data)
             else:
-                if data:
-                    self._protocol.data_received(data)
-                else:
-                    try:
-                        self._protocol.eof_received()
-                    finally:
-                        self.close()
+                try:
+                    keep_open = self._protocol.eof_received()
+                    if keep_open:
+                        logger.warning('returning true from eof_received() '
+                                       'has no effect when using ssl')
+                finally:
+                    self.close()
 
-        # Now try writing, if there's anything to write.
+    def _write_ready(self):
+        if self._read_wants_write:
+            self._read_wants_write = False
+            self._read_ready()
+
+            if not (self._paused or self._closing):
+                self._loop.add_reader(self._sock_fd, self._read_ready)
+
         if self._buffer:
             data = b''.join(self._buffer)
             self._buffer.clear()
             try:
                 n = self._sock.send(data)
             except (BlockingIOError, InterruptedError,
-                    ssl.SSLWantReadError, ssl.SSLWantWriteError):
+                    ssl.SSLWantWriteError):
                 n = 0
+            except ssl.SSLWantReadError:
+                n = 0
+                self._loop.remove_writer(self._sock_fd)
+                self._write_wants_read = True
             except Exception as exc:
                 self._loop.remove_writer(self._sock_fd)
                 self._fatal_error(exc)
@@ -691,11 +726,12 @@
             if n < len(data):
                 self._buffer.append(data[n:])
 
-            self._maybe_resume_protocol()  # May append to buffer.
+        self._maybe_resume_protocol()  # May append to buffer.
 
-        if self._closing and not self._buffer:
+        if not self._buffer:
             self._loop.remove_writer(self._sock_fd)
-            self._call_connection_lost(None)
+            if self._closing:
+                self._call_connection_lost(None)
 
     def write(self, data):
         assert isinstance(data, bytes), repr(type(data))
@@ -708,20 +744,16 @@
             self._conn_lost += 1
             return
 
-        # We could optimize, but the callback can do this for now.
+        if not self._buffer:
+            self._loop.add_writer(self._sock_fd, self._write_ready)
+
+        # Add it to the buffer.
         self._buffer.append(data)
         self._maybe_pause_protocol()
 
     def can_write_eof(self):
         return False
 
-    def close(self):
-        if self._closing:
-            return
-        self._closing = True
-        self._conn_lost += 1
-        self._loop.remove_reader(self._sock_fd)
-
 
 class _SelectorDatagramTransport(_SelectorTransport):
 
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -62,8 +62,9 @@
             code = func.__code__
             filename = code.co_filename
             lineno = code.co_firstlineno
-            logger.error('Coroutine %r defined at %s:%s was never yielded from',
-                         func.__name__, filename, lineno)
+            logger.error(
+                'Coroutine %r defined at %s:%s was never yielded from',
+                func.__name__, filename, lineno)
 
 
 def coroutine(func):
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -138,6 +138,7 @@
     @tasks.coroutine
     def start_serving_pipe(self, protocol_factory, address):
         server = PipeServer(address)
+
         def loop(f=None):
             pipe = None
             try:
@@ -160,6 +161,7 @@
                     pipe.close()
             else:
                 f.add_done_callback(loop)
+
         self.call_soon(loop)
         return [server]
 
@@ -209,6 +211,7 @@
             ov.WSARecv(conn.fileno(), nbytes, flags)
         else:
             ov.ReadFile(conn.fileno(), nbytes)
+
         def finish(trans, key, ov):
             try:
                 return ov.getresult()
@@ -217,6 +220,7 @@
                     raise ConnectionResetError(*exc.args)
                 else:
                     raise
+
         return self._register(ov, conn, finish)
 
     def send(self, conn, buf, flags=0):
@@ -226,6 +230,7 @@
             ov.WSASend(conn.fileno(), buf, flags)
         else:
             ov.WriteFile(conn.fileno(), buf)
+
         def finish(trans, key, ov):
             try:
                 return ov.getresult()
@@ -234,6 +239,7 @@
                     raise ConnectionResetError(*exc.args)
                 else:
                     raise
+
         return self._register(ov, conn, finish)
 
     def accept(self, listener):
@@ -241,6 +247,7 @@
         conn = self._get_accept_socket(listener.family)
         ov = _overlapped.Overlapped(NULL)
         ov.AcceptEx(listener.fileno(), conn.fileno())
+
         def finish_accept(trans, key, ov):
             ov.getresult()
             # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
@@ -249,6 +256,7 @@
                             _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
             conn.settimeout(listener.gettimeout())
             return conn, conn.getpeername()
+
         return self._register(ov, listener, finish_accept)
 
     def connect(self, conn, address):
@@ -264,26 +272,31 @@
                 raise
         ov = _overlapped.Overlapped(NULL)
         ov.ConnectEx(conn.fileno(), address)
+
         def finish_connect(trans, key, ov):
             ov.getresult()
             # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
             conn.setsockopt(socket.SOL_SOCKET,
                             _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
             return conn
+
         return self._register(ov, conn, finish_connect)
 
     def accept_pipe(self, pipe):
         self._register_with_iocp(pipe)
         ov = _overlapped.Overlapped(NULL)
         ov.ConnectNamedPipe(pipe.fileno())
+
         def finish(trans, key, ov):
             ov.getresult()
             return pipe
+
         return self._register(ov, pipe, finish)
 
     def connect_pipe(self, address):
         ov = _overlapped.Overlapped(NULL)
         ov.WaitNamedPipeAndConnect(address, self._iocp, ov.address)
+
         def finish(err, handle, ov):
             # err, handle were arguments passed to PostQueuedCompletionStatus()
             # in a function run in a thread pool.
@@ -296,6 +309,7 @@
                 raise OSError(0, msg, None, err)
             else:
                 return windows_utils.PipeHandle(handle)
+
         return self._register(ov, None, finish, wait_for_post=True)
 
     def wait_for_handle(self, handle, timeout=None):
@@ -432,8 +446,10 @@
         self._proc = windows_utils.Popen(
             args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
             bufsize=bufsize, **kwargs)
+
         def callback(f):
             returncode = self._proc.poll()
             self._process_exited(returncode)
+
         f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
         f.add_done_callback(callback)
diff --git a/Lib/asyncio/windows_utils.py b/Lib/asyncio/windows_utils.py
--- a/Lib/asyncio/windows_utils.py
+++ b/Lib/asyncio/windows_utils.py
@@ -18,18 +18,18 @@
 
 __all__ = ['socketpair', 'pipe', 'Popen', 'PIPE', 'PipeHandle']
 
-#
+
 # Constants/globals
-#
+
 
 BUFSIZE = 8192
 PIPE = subprocess.PIPE
 STDOUT = subprocess.STDOUT
 _mmap_counter = itertools.count()
 
-#
+
 # Replacement for socket.socketpair()
-#
+
 
 def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0):
     """A socket pair usable as a self-pipe, for Windows.
@@ -57,9 +57,9 @@
     lsock.close()
     return (ssock, csock)
 
-#
+
 # Replacement for os.pipe() using handles instead of fds
-#
+
 
 def pipe(*, duplex=False, overlapped=(True, True), bufsize=BUFSIZE):
     """Like os.pipe() but with overlapped support and using handles not fds."""
@@ -105,9 +105,9 @@
             _winapi.CloseHandle(h2)
         raise
 
-#
+
 # Wrapper for a pipe handle
-#
+
 
 class PipeHandle:
     """Wrapper for an overlapped pipe handle which is vaguely file-object like.
@@ -137,9 +137,9 @@
     def __exit__(self, t, v, tb):
         self.close()
 
-#
+
 # Replacement for subprocess.Popen using overlapped pipe handles
-#
+
 
 class Popen(subprocess.Popen):
     """Replacement for subprocess.Popen using overlapped pipe handles.
diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py
--- a/Lib/test/test_asyncio/test_base_events.py
+++ b/Lib/test/test_asyncio/test_base_events.py
@@ -1,5 +1,6 @@
 """Tests for base_events.py"""
 
+import errno
 import logging
 import socket
 import time
@@ -8,6 +9,7 @@
 from test.support import find_unused_port, IPV6_ENABLED
 
 from asyncio import base_events
+from asyncio import constants
 from asyncio import events
 from asyncio import futures
 from asyncio import protocols
@@ -442,6 +444,71 @@
         self.assertRaises(
             OSError, self.loop.run_until_complete, coro)
 
+    def test_create_connection_ssl_server_hostname_default(self):
+        self.loop.getaddrinfo = unittest.mock.Mock()
+
+        def mock_getaddrinfo(*args, **kwds):
+            f = futures.Future(loop=self.loop)
+            f.set_result([(socket.AF_INET, socket.SOCK_STREAM,
+                           socket.SOL_TCP, '', ('1.2.3.4', 80))])
+            return f
+
+        self.loop.getaddrinfo.side_effect = mock_getaddrinfo
+        self.loop.sock_connect = unittest.mock.Mock()
+        self.loop.sock_connect.return_value = ()
+        self.loop._make_ssl_transport = unittest.mock.Mock()
+
+        def mock_make_ssl_transport(sock, protocol, sslcontext, waiter,
+                                    **kwds):
+            waiter.set_result(None)
+
+        self.loop._make_ssl_transport.side_effect = mock_make_ssl_transport
+        ANY = unittest.mock.ANY
+        # First try the default server_hostname.
+        self.loop._make_ssl_transport.reset_mock()
+        coro = self.loop.create_connection(MyProto, 'python.org', 80, ssl=True)
+        self.loop.run_until_complete(coro)
+        self.loop._make_ssl_transport.assert_called_with(
+            ANY, ANY, ANY, ANY,
+            server_side=False,
+            server_hostname='python.org')
+        # Next try an explicit server_hostname.
+        self.loop._make_ssl_transport.reset_mock()
+        coro = self.loop.create_connection(MyProto, 'python.org', 80, ssl=True,
+                                           server_hostname='perl.com')
+        self.loop.run_until_complete(coro)
+        self.loop._make_ssl_transport.assert_called_with(
+            ANY, ANY, ANY, ANY,
+            server_side=False,
+            server_hostname='perl.com')
+        # Finally try an explicit empty server_hostname.
+        self.loop._make_ssl_transport.reset_mock()
+        coro = self.loop.create_connection(MyProto, 'python.org', 80, ssl=True,
+                                           server_hostname='')
+        self.loop.run_until_complete(coro)
+        self.loop._make_ssl_transport.assert_called_with(ANY, ANY, ANY, ANY,
+                                                         server_side=False,
+                                                         server_hostname='')
+
+    def test_create_connection_no_ssl_server_hostname_errors(self):
+        # When not using ssl, server_hostname must be None.
+        coro = self.loop.create_connection(MyProto, 'python.org', 80,
+                                           server_hostname='')
+        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
+        coro = self.loop.create_connection(MyProto, 'python.org', 80,
+                                           server_hostname='python.org')
+        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
+
+    def test_create_connection_ssl_server_hostname_errors(self):
+        # When using ssl, server_hostname may be None if host is non-empty.
+        coro = self.loop.create_connection(MyProto, '', 80, ssl=True)
+        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
+        coro = self.loop.create_connection(MyProto, None, 80, ssl=True)
+        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
+        coro = self.loop.create_connection(MyProto, None, None,
+                                           ssl=True, sock=socket.socket())
+        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
+
     def test_create_server_empty_host(self):
         # if host is empty string use None instead
         host = object()
@@ -585,11 +652,18 @@
     def test_accept_connection_exception(self, m_log):
         sock = unittest.mock.Mock()
         sock.fileno.return_value = 10
-        sock.accept.side_effect = OSError()
+        sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files')
+        self.loop.remove_reader = unittest.mock.Mock()
+        self.loop.call_later = unittest.mock.Mock()
 
         self.loop._accept_connection(MyProto, sock)
-        self.assertTrue(sock.close.called)
         self.assertTrue(m_log.exception.called)
+        self.assertFalse(sock.close.called)
+        self.loop.remove_reader.assert_called_with(10)
+        self.loop.call_later.assert_called_with(constants.ACCEPT_RETRY_DELAY,
+                                                # self.loop._start_serving
+                                                unittest.mock.ANY,
+                                                MyProto, sock, None, None)
 
 
 if __name__ == '__main__':
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -1276,7 +1276,6 @@
         def create_event_loop(self):
             return windows_events.SelectorEventLoop()
 
-
     class ProactorEventLoopTests(EventLoopTestsMixin,
                                  SubprocessTestsMixin,
                                  unittest.TestCase):
@@ -1472,6 +1471,8 @@
         self.assertRaises(
             NotImplementedError, loop.is_running)
         self.assertRaises(
+            NotImplementedError, loop.close)
+        self.assertRaises(
             NotImplementedError, loop.call_later, None, None)
         self.assertRaises(
             NotImplementedError, loop.call_at, f, f)
diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py
--- a/Lib/test/test_asyncio/test_selector_events.py
+++ b/Lib/test/test_asyncio/test_selector_events.py
@@ -43,6 +43,7 @@
         self.assertIsInstance(
             self.loop._make_socket_transport(m, m), _SelectorSocketTransport)
 
+    @unittest.skipIf(ssl is None, 'No ssl module')
     def test_make_ssl_transport(self):
         m = unittest.mock.Mock()
         self.loop.add_reader = unittest.mock.Mock()
@@ -52,6 +53,16 @@
         self.assertIsInstance(
             self.loop._make_ssl_transport(m, m, m, m), _SelectorSslTransport)
 
+    @unittest.mock.patch('asyncio.selector_events.ssl', None)
+    def test_make_ssl_transport_without_ssl_error(self):
+        m = unittest.mock.Mock()
+        self.loop.add_reader = unittest.mock.Mock()
+        self.loop.add_writer = unittest.mock.Mock()
+        self.loop.remove_reader = unittest.mock.Mock()
+        self.loop.remove_writer = unittest.mock.Mock()
+        with self.assertRaises(RuntimeError):
+            self.loop._make_ssl_transport(m, m, m, m)
+
     def test_close(self):
         ssock = self.loop._ssock
         ssock.fileno.return_value = 7
@@ -1003,8 +1014,7 @@
             self.loop, self.sock, self.protocol, self.sslcontext,
             waiter=waiter)
         self.assertTrue(self.sslsock.do_handshake.called)
-        self.loop.assert_reader(1, tr._on_ready)
-        self.loop.assert_writer(1, tr._on_ready)
+        self.loop.assert_reader(1, tr._read_ready)
         test_utils.run_briefly(self.loop)
         self.assertIsNone(waiter.result())
 
@@ -1047,13 +1057,13 @@
     def test_pause_resume_reading(self):
         tr = self._make_one()
         self.assertFalse(tr._paused)
-        self.loop.assert_reader(1, tr._on_ready)
+        self.loop.assert_reader(1, tr._read_ready)
         tr.pause_reading()
         self.assertTrue(tr._paused)
         self.assertFalse(1 in self.loop.readers)
         tr.resume_reading()
         self.assertFalse(tr._paused)
-        self.loop.assert_reader(1, tr._on_ready)
+        self.loop.assert_reader(1, tr._read_ready)
 
     def test_write_no_data(self):
         transport = self._make_one()
@@ -1084,140 +1094,173 @@
         transport.write(b'data')
         m_log.warning.assert_called_with('socket.send() raised exception.')
 
-    def test_on_ready_recv(self):
+    def test_read_ready_recv(self):
         self.sslsock.recv.return_value = b'data'
         transport = self._make_one()
-        transport._on_ready()
+        transport._read_ready()
         self.assertTrue(self.sslsock.recv.called)
         self.assertEqual((b'data',), self.protocol.data_received.call_args[0])
 
-    def test_on_ready_recv_eof(self):
+    def test_read_ready_write_wants_read(self):
+        self.loop.add_writer = unittest.mock.Mock()
+        self.sslsock.recv.side_effect = BlockingIOError
+        transport = self._make_one()
+        transport._write_wants_read = True
+        transport._write_ready = unittest.mock.Mock()
+        transport._buffer.append(b'data')
+        transport._read_ready()
+
+        self.assertFalse(transport._write_wants_read)
+        transport._write_ready.assert_called_with()
+        self.loop.add_writer.assert_called_with(
+            transport._sock_fd, transport._write_ready)
+
+    def test_read_ready_recv_eof(self):
         self.sslsock.recv.return_value = b''
         transport = self._make_one()
         transport.close = unittest.mock.Mock()
-        transport._on_ready()
+        transport._read_ready()
         transport.close.assert_called_with()
         self.protocol.eof_received.assert_called_with()
 
-    def test_on_ready_recv_conn_reset(self):
+    def test_read_ready_recv_conn_reset(self):
         err = self.sslsock.recv.side_effect = ConnectionResetError()
         transport = self._make_one()
         transport._force_close = unittest.mock.Mock()
-        transport._on_ready()
+        transport._read_ready()
         transport._force_close.assert_called_with(err)
 
-    def test_on_ready_recv_retry(self):
+    def test_read_ready_recv_retry(self):
         self.sslsock.recv.side_effect = ssl.SSLWantReadError
         transport = self._make_one()
-        transport._on_ready()
+        transport._read_ready()
         self.assertTrue(self.sslsock.recv.called)
         self.assertFalse(self.protocol.data_received.called)
 
-        self.sslsock.recv.side_effect = ssl.SSLWantWriteError
-        transport._on_ready()
-        self.assertFalse(self.protocol.data_received.called)
-
         self.sslsock.recv.side_effect = BlockingIOError
-        transport._on_ready()
+        transport._read_ready()
         self.assertFalse(self.protocol.data_received.called)
 
         self.sslsock.recv.side_effect = InterruptedError
-        transport._on_ready()
+        transport._read_ready()
         self.assertFalse(self.protocol.data_received.called)
 
-    def test_on_ready_recv_exc(self):
+    def test_read_ready_recv_write(self):
+        self.loop.remove_reader = unittest.mock.Mock()
+        self.loop.add_writer = unittest.mock.Mock()
+        self.sslsock.recv.side_effect = ssl.SSLWantWriteError
+        transport = self._make_one()
+        transport._read_ready()
+        self.assertFalse(self.protocol.data_received.called)
+        self.assertTrue(transport._read_wants_write)
+
+        self.loop.remove_reader.assert_called_with(transport._sock_fd)
+        self.loop.add_writer.assert_called_with(
+            transport._sock_fd, transport._write_ready)
+
+    def test_read_ready_recv_exc(self):
         err = self.sslsock.recv.side_effect = OSError()
         transport = self._make_one()
         transport._fatal_error = unittest.mock.Mock()
-        transport._on_ready()
+        transport._read_ready()
         transport._fatal_error.assert_called_with(err)
 
-    def test_on_ready_send(self):
-        self.sslsock.recv.side_effect = ssl.SSLWantReadError
+    def test_write_ready_send(self):
         self.sslsock.send.return_value = 4
         transport = self._make_one()
         transport._buffer = collections.deque([b'data'])
-        transport._on_ready()
+        transport._write_ready()
         self.assertEqual(collections.deque(), transport._buffer)
         self.assertTrue(self.sslsock.send.called)
 
-    def test_on_ready_send_none(self):
-        self.sslsock.recv.side_effect = ssl.SSLWantReadError
+    def test_write_ready_send_none(self):
         self.sslsock.send.return_value = 0
         transport = self._make_one()
         transport._buffer = collections.deque([b'data1', b'data2'])
-        transport._on_ready()
+        transport._write_ready()
         self.assertTrue(self.sslsock.send.called)
         self.assertEqual(collections.deque([b'data1data2']), transport._buffer)
 
-    def test_on_ready_send_partial(self):
-        self.sslsock.recv.side_effect = ssl.SSLWantReadError
+    def test_write_ready_send_partial(self):
         self.sslsock.send.return_value = 2
         transport = self._make_one()
         transport._buffer = collections.deque([b'data1', b'data2'])
-        transport._on_ready()
+        transport._write_ready()
         self.assertTrue(self.sslsock.send.called)
         self.assertEqual(collections.deque([b'ta1data2']), transport._buffer)
 
-    def test_on_ready_send_closing_partial(self):
-        self.sslsock.recv.side_effect = ssl.SSLWantReadError
+    def test_write_ready_send_closing_partial(self):
         self.sslsock.send.return_value = 2
         transport = self._make_one()
         transport._buffer = collections.deque([b'data1', b'data2'])
-        transport._on_ready()
+        transport._write_ready()
         self.assertTrue(self.sslsock.send.called)
         self.assertFalse(self.sslsock.close.called)
 
-    def test_on_ready_send_closing(self):
-        self.sslsock.recv.side_effect = ssl.SSLWantReadError
+    def test_write_ready_send_closing(self):
         self.sslsock.send.return_value = 4
         transport = self._make_one()
         transport.close()
         transport._buffer = collections.deque([b'data'])
-        transport._on_ready()
+        transport._write_ready()
         self.assertFalse(self.loop.writers)
         self.protocol.connection_lost.assert_called_with(None)
 
-    def test_on_ready_send_closing_empty_buffer(self):
-        self.sslsock.recv.side_effect = ssl.SSLWantReadError
+    def test_write_ready_send_closing_empty_buffer(self):
         self.sslsock.send.return_value = 4
         transport = self._make_one()
         transport.close()
         transport._buffer = collections.deque()
-        transport._on_ready()
+        transport._write_ready()
         self.assertFalse(self.loop.writers)
         self.protocol.connection_lost.assert_called_with(None)
 
-    def test_on_ready_send_retry(self):
-        self.sslsock.recv.side_effect = ssl.SSLWantReadError
-
+    def test_write_ready_send_retry(self):
         transport = self._make_one()
         transport._buffer = collections.deque([b'data'])
 
-        self.sslsock.send.side_effect = ssl.SSLWantReadError
-        transport._on_ready()
-        self.assertTrue(self.sslsock.send.called)
-        self.assertEqual(collections.deque([b'data']), transport._buffer)
-
         self.sslsock.send.side_effect = ssl.SSLWantWriteError
-        transport._on_ready()
+        transport._write_ready()
         self.assertEqual(collections.deque([b'data']), transport._buffer)
 
         self.sslsock.send.side_effect = BlockingIOError()
-        transport._on_ready()
+        transport._write_ready()
         self.assertEqual(collections.deque([b'data']), transport._buffer)
 
-    def test_on_ready_send_exc(self):
-        self.sslsock.recv.side_effect = ssl.SSLWantReadError
+    def test_write_ready_send_read(self):
+        transport = self._make_one()
+        transport._buffer = collections.deque([b'data'])
+
+        self.loop.remove_writer = unittest.mock.Mock()
+        self.sslsock.send.side_effect = ssl.SSLWantReadError
+        transport._write_ready()
+        self.assertFalse(self.protocol.data_received.called)
+        self.assertTrue(transport._write_wants_read)
+        self.loop.remove_writer.assert_called_with(transport._sock_fd)
+
+    def test_write_ready_send_exc(self):
         err = self.sslsock.send.side_effect = OSError()
 
         transport = self._make_one()
         transport._buffer = collections.deque([b'data'])
         transport._fatal_error = unittest.mock.Mock()
-        transport._on_ready()
+        transport._write_ready()
         transport._fatal_error.assert_called_with(err)
         self.assertEqual(collections.deque(), transport._buffer)
 
+    def test_write_ready_read_wants_write(self):
+        self.loop.add_reader = unittest.mock.Mock()
+        self.sslsock.send.side_effect = BlockingIOError
+        transport = self._make_one()
+        transport._read_wants_write = True
+        transport._read_ready = unittest.mock.Mock()
+        transport._write_ready()
+
+        self.assertFalse(transport._read_wants_write)
+        transport._read_ready.assert_called_with()
+        self.loop.add_reader.assert_called_with(
+            transport._sock_fd, transport._read_ready)
+
     def test_write_eof(self):
         tr = self._make_one()
         self.assertFalse(tr.can_write_eof())
@@ -1245,6 +1288,15 @@
             server_hostname='localhost')
 
 
+class SelectorSslWithoutSslTransportTests(unittest.TestCase):
+
+    @unittest.mock.patch('asyncio.selector_events.ssl', None)
+    def test_ssl_transport_requires_ssl_module(self):
+        Mock = unittest.mock.Mock
+        with self.assertRaises(RuntimeError):
+            transport = _SelectorSslTransport(Mock(), Mock(), Mock(), Mock())
+
+
 class SelectorDatagramTransportTests(unittest.TestCase):
 
     def setUp(self):
diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py
--- a/Lib/test/test_asyncio/test_windows_events.py
+++ b/Lib/test/test_asyncio/test_windows_events.py
@@ -77,7 +77,7 @@
             stream_reader = streams.StreamReader(loop=self.loop)
             protocol = streams.StreamReaderProtocol(stream_reader)
             trans, proto = yield from self.loop.create_pipe_connection(
-                lambda:protocol, ADDRESS)
+                lambda: protocol, ADDRESS)
             self.assertIsInstance(trans, transports.Transport)
             self.assertEqual(protocol, proto)
             clients.append((stream_reader, trans))

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list