[Python-checkins] cpython (merge default -> default): Merge heads
serhiy.storchaka
python-checkins at python.org
Sat Nov 2 09:48:41 CET 2013
http://hg.python.org/cpython/rev/e92bba5b53db
changeset: 86837:e92bba5b53db
parent: 86835:ab7c2c1d349c
parent: 86833:123804a72a8f
user: Serhiy Storchaka <storchaka at gmail.com>
date: Sat Nov 02 10:47:57 2013 +0200
summary:
Merge heads
files:
Lib/asyncio/base_events.py | 30 +-
Lib/asyncio/constants.py | 5 +-
Lib/asyncio/events.py | 15 +-
Lib/asyncio/selector_events.py | 154 ++++++---
Lib/asyncio/tasks.py | 5 +-
Lib/asyncio/windows_events.py | 16 +
Lib/asyncio/windows_utils.py | 20 +-
Lib/test/test_asyncio/test_base_events.py | 78 ++++-
Lib/test/test_asyncio/test_events.py | 3 +-
Lib/test/test_asyncio/test_selector_events.py | 154 ++++++---
Lib/test/test_asyncio/test_windows_events.py | 2 +-
11 files changed, 350 insertions(+), 132 deletions(-)
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -186,6 +186,11 @@
self.call_soon(_raise_stop_error)
def close(self):
+ """Close the event loop.
+
+ This clears the queues and shuts down the executor,
+ but does not wait for the executor to finish.
+ """
self._ready.clear()
self._scheduled.clear()
executor = self._default_executor
@@ -275,8 +280,27 @@
@tasks.coroutine
def create_connection(self, protocol_factory, host=None, port=None, *,
ssl=None, family=0, proto=0, flags=0, sock=None,
- local_addr=None):
+ local_addr=None, server_hostname=None):
"""XXX"""
+ if server_hostname is not None and not ssl:
+ raise ValueError('server_hostname is only meaningful with ssl')
+
+ if server_hostname is None and ssl:
+ # Use host as default for server_hostname. It is an error
+ # if host is empty or not set, e.g. when an
+ # already-connected socket was passed or when only a port
+ # is given. To avoid this error, you can pass
+ # server_hostname='' -- this will bypass the hostname
+ # check. (This also means that if host is a numeric
+ # IP/IPv6 address, we will attempt to verify that exact
+ # address; this will probably fail, but it is possible to
+ # create a certificate for a specific IP address, so we
+ # don't judge it here.)
+ if not host:
+ raise ValueError('You must set server_hostname '
+ 'when using ssl without a host')
+ server_hostname = host
+
if host is not None or port is not None:
if sock is not None:
raise ValueError(
@@ -357,7 +381,7 @@
sslcontext = None if isinstance(ssl, bool) else ssl
transport = self._make_ssl_transport(
sock, protocol, sslcontext, waiter,
- server_side=False, server_hostname=host)
+ server_side=False, server_hostname=server_hostname)
else:
transport = self._make_socket_transport(sock, protocol, waiter)
@@ -442,6 +466,8 @@
ssl=None,
reuse_address=None):
"""XXX"""
+ if isinstance(ssl, bool):
+ raise TypeError('ssl argument must be an SSLContext or None')
if host is not None or port is not None:
if sock is not None:
raise ValueError(
diff --git a/Lib/asyncio/constants.py b/Lib/asyncio/constants.py
--- a/Lib/asyncio/constants.py
+++ b/Lib/asyncio/constants.py
@@ -1,4 +1,7 @@
"""Constants."""
+# After the connection is lost, log warnings after this many write()s.
+LOG_THRESHOLD_FOR_CONNLOST_WRITES = 5
-LOG_THRESHOLD_FOR_CONNLOST_WRITES = 5
+# Seconds to wait before retrying accept().
+ACCEPT_RETRY_DELAY = 1
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -137,6 +137,17 @@
"""Return whether the event loop is currently running."""
raise NotImplementedError
+ def close(self):
+ """Close the loop.
+
+ The loop should not be running.
+
+ This is idempotent and irreversible.
+
+ No other methods should be called after this one.
+ """
+ raise NotImplementedError
+
# Methods scheduling callbacks. All these return Handles.
def call_soon(self, callback, *args):
@@ -172,7 +183,7 @@
def create_connection(self, protocol_factory, host=None, port=None, *,
ssl=None, family=0, proto=0, flags=0, sock=None,
- local_addr=None):
+ local_addr=None, server_hostname=None):
raise NotImplementedError
def create_server(self, protocol_factory, host=None, port=None, *,
@@ -214,6 +225,8 @@
family=0, proto=0, flags=0):
raise NotImplementedError
+ # Pipes and subprocesses.
+
def connect_read_pipe(self, protocol_factory, pipe):
"""Register read pipe in eventloop.
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -5,6 +5,7 @@
"""
import collections
+import errno
import socket
try:
import ssl
@@ -89,28 +90,37 @@
except (BlockingIOError, InterruptedError):
pass
- def _start_serving(self, protocol_factory, sock, ssl=None, server=None):
+ def _start_serving(self, protocol_factory, sock,
+ sslcontext=None, server=None):
self.add_reader(sock.fileno(), self._accept_connection,
- protocol_factory, sock, ssl, server)
+ protocol_factory, sock, sslcontext, server)
- def _accept_connection(self, protocol_factory, sock, ssl=None,
- server=None):
+ def _accept_connection(self, protocol_factory, sock,
+ sslcontext=None, server=None):
try:
conn, addr = sock.accept()
conn.setblocking(False)
- except (BlockingIOError, InterruptedError):
+ except (BlockingIOError, InterruptedError, ConnectionAbortedError):
pass # False alarm.
- except Exception:
- # Bad error. Stop serving.
- self.remove_reader(sock.fileno())
- sock.close()
+ except OSError as exc:
# There's nowhere to send the error, so just log it.
# TODO: Someone will want an error handler for this.
- logger.exception('Accept failed')
+ if exc.errno in (errno.EMFILE, errno.ENFILE,
+ errno.ENOBUFS, errno.ENOMEM):
+ # Some platforms (e.g. Linux keep reporting the FD as
+ # ready, so we remove the read handler temporarily.
+ # We'll try again in a while.
+ logger.exception('Accept out of system resource (%s)', exc)
+ self.remove_reader(sock.fileno())
+ self.call_later(constants.ACCEPT_RETRY_DELAY,
+ self._start_serving,
+ protocol_factory, sock, sslcontext, server)
+ else:
+ raise # The event loop will catch, log and ignore it.
else:
- if ssl:
+ if sslcontext:
self._make_ssl_transport(
- conn, protocol_factory(), ssl, None,
+ conn, protocol_factory(), sslcontext, None,
server_side=True, extra={'peername': addr}, server=server)
else:
self._make_socket_transport(
@@ -277,7 +287,7 @@
err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
# Jump to the except clause below.
- raise OSError(err, 'Connect call failed')
+ raise OSError(err, 'Connect call failed %s' % (address,))
except (BlockingIOError, InterruptedError):
self.add_writer(fd, self._sock_connect, fut, True, sock, address)
except Exception as exc:
@@ -404,15 +414,16 @@
try:
self._protocol.pause_writing()
except Exception:
- tulip_log.exception('pause_writing() failed')
+ logger.exception('pause_writing() failed')
def _maybe_resume_protocol(self):
- if self._protocol_paused and self.get_write_buffer_size() <= self._low_water:
+ if (self._protocol_paused and
+ self.get_write_buffer_size() <= self._low_water):
self._protocol_paused = False
try:
self._protocol.resume_writing()
except Exception:
- tulip_log.exception('resume_writing() failed')
+ logger.exception('resume_writing() failed')
def set_write_buffer_limits(self, high=None, low=None):
if high is None:
@@ -548,22 +559,28 @@
def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None,
server_side=False, server_hostname=None,
extra=None, server=None):
+ if ssl is None:
+ raise RuntimeError('stdlib ssl module not available')
+
if server_side:
- assert isinstance(
- sslcontext, ssl.SSLContext), 'Must pass an SSLContext'
+ if not sslcontext:
+ raise ValueError('Server side ssl needs a valid SSLContext')
else:
- # Client-side may pass ssl=True to use a default context.
- # The default is the same as used by urllib.
- if sslcontext is None:
+ if not sslcontext:
+ # Client side may pass ssl=True to use a default
+ # context; in that case the sslcontext passed is None.
+ # The default is the same as used by urllib with
+ # cadefault=True.
sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
sslcontext.options |= ssl.OP_NO_SSLv2
sslcontext.set_default_verify_paths()
sslcontext.verify_mode = ssl.CERT_REQUIRED
+
wrap_kwargs = {
'server_side': server_side,
'do_handshake_on_connect': False,
}
- if server_hostname is not None and not server_side and ssl.HAS_SNI:
+ if server_hostname and not server_side and ssl.HAS_SNI:
wrap_kwargs['server_hostname'] = server_hostname
sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs)
@@ -609,7 +626,7 @@
# Verify hostname if requested.
peercert = self._sock.getpeercert()
- if (self._server_hostname is not None and
+ if (self._server_hostname and
self._sslcontext.verify_mode != ssl.CERT_NONE):
try:
ssl.match_hostname(peercert, self._server_hostname)
@@ -625,15 +642,16 @@
compression=self._sock.compression(),
)
- self._loop.add_reader(self._sock_fd, self._on_ready)
- self._loop.add_writer(self._sock_fd, self._on_ready)
+ self._read_wants_write = False
+ self._write_wants_read = False
+ self._loop.add_reader(self._sock_fd, self._read_ready)
self._loop.call_soon(self._protocol.connection_made, self)
if self._waiter is not None:
self._loop.call_soon(self._waiter.set_result, None)
def pause_reading(self):
# XXX This is a bit icky, given the comment at the top of
- # _on_ready(). Is it possible to evoke a deadlock? I don't
+ # _read_ready(). Is it possible to evoke a deadlock? I don't
# know, although it doesn't look like it; write() will still
# accept more data for the buffer and eventually the app will
# call resume_reading() again, and things will flow again.
@@ -648,41 +666,58 @@
self._paused = False
if self._closing:
return
- self._loop.add_reader(self._sock_fd, self._on_ready)
+ self._loop.add_reader(self._sock_fd, self._read_ready)
- def _on_ready(self):
- # Because of renegotiations (?), there's no difference between
- # readable and writable. We just try both. XXX This may be
- # incorrect; we probably need to keep state about what we
- # should do next.
+ def _read_ready(self):
+ if self._write_wants_read:
+ self._write_wants_read = False
+ self._write_ready()
- # First try reading.
- if not self._closing and not self._paused:
- try:
- data = self._sock.recv(self.max_size)
- except (BlockingIOError, InterruptedError,
- ssl.SSLWantReadError, ssl.SSLWantWriteError):
- pass
- except Exception as exc:
- self._fatal_error(exc)
+ if self._buffer:
+ self._loop.add_writer(self._sock_fd, self._write_ready)
+
+ try:
+ data = self._sock.recv(self.max_size)
+ except (BlockingIOError, InterruptedError, ssl.SSLWantReadError):
+ pass
+ except ssl.SSLWantWriteError:
+ self._read_wants_write = True
+ self._loop.remove_reader(self._sock_fd)
+ self._loop.add_writer(self._sock_fd, self._write_ready)
+ except Exception as exc:
+ self._fatal_error(exc)
+ else:
+ if data:
+ self._protocol.data_received(data)
else:
- if data:
- self._protocol.data_received(data)
- else:
- try:
- self._protocol.eof_received()
- finally:
- self.close()
+ try:
+ keep_open = self._protocol.eof_received()
+ if keep_open:
+ logger.warning('returning true from eof_received() '
+ 'has no effect when using ssl')
+ finally:
+ self.close()
- # Now try writing, if there's anything to write.
+ def _write_ready(self):
+ if self._read_wants_write:
+ self._read_wants_write = False
+ self._read_ready()
+
+ if not (self._paused or self._closing):
+ self._loop.add_reader(self._sock_fd, self._read_ready)
+
if self._buffer:
data = b''.join(self._buffer)
self._buffer.clear()
try:
n = self._sock.send(data)
except (BlockingIOError, InterruptedError,
- ssl.SSLWantReadError, ssl.SSLWantWriteError):
+ ssl.SSLWantWriteError):
n = 0
+ except ssl.SSLWantReadError:
+ n = 0
+ self._loop.remove_writer(self._sock_fd)
+ self._write_wants_read = True
except Exception as exc:
self._loop.remove_writer(self._sock_fd)
self._fatal_error(exc)
@@ -691,11 +726,12 @@
if n < len(data):
self._buffer.append(data[n:])
- self._maybe_resume_protocol() # May append to buffer.
+ self._maybe_resume_protocol() # May append to buffer.
- if self._closing and not self._buffer:
+ if not self._buffer:
self._loop.remove_writer(self._sock_fd)
- self._call_connection_lost(None)
+ if self._closing:
+ self._call_connection_lost(None)
def write(self, data):
assert isinstance(data, bytes), repr(type(data))
@@ -708,20 +744,16 @@
self._conn_lost += 1
return
- # We could optimize, but the callback can do this for now.
+ if not self._buffer:
+ self._loop.add_writer(self._sock_fd, self._write_ready)
+
+ # Add it to the buffer.
self._buffer.append(data)
self._maybe_pause_protocol()
def can_write_eof(self):
return False
- def close(self):
- if self._closing:
- return
- self._closing = True
- self._conn_lost += 1
- self._loop.remove_reader(self._sock_fd)
-
class _SelectorDatagramTransport(_SelectorTransport):
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -62,8 +62,9 @@
code = func.__code__
filename = code.co_filename
lineno = code.co_firstlineno
- logger.error('Coroutine %r defined at %s:%s was never yielded from',
- func.__name__, filename, lineno)
+ logger.error(
+ 'Coroutine %r defined at %s:%s was never yielded from',
+ func.__name__, filename, lineno)
def coroutine(func):
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -138,6 +138,7 @@
@tasks.coroutine
def start_serving_pipe(self, protocol_factory, address):
server = PipeServer(address)
+
def loop(f=None):
pipe = None
try:
@@ -160,6 +161,7 @@
pipe.close()
else:
f.add_done_callback(loop)
+
self.call_soon(loop)
return [server]
@@ -209,6 +211,7 @@
ov.WSARecv(conn.fileno(), nbytes, flags)
else:
ov.ReadFile(conn.fileno(), nbytes)
+
def finish(trans, key, ov):
try:
return ov.getresult()
@@ -217,6 +220,7 @@
raise ConnectionResetError(*exc.args)
else:
raise
+
return self._register(ov, conn, finish)
def send(self, conn, buf, flags=0):
@@ -226,6 +230,7 @@
ov.WSASend(conn.fileno(), buf, flags)
else:
ov.WriteFile(conn.fileno(), buf)
+
def finish(trans, key, ov):
try:
return ov.getresult()
@@ -234,6 +239,7 @@
raise ConnectionResetError(*exc.args)
else:
raise
+
return self._register(ov, conn, finish)
def accept(self, listener):
@@ -241,6 +247,7 @@
conn = self._get_accept_socket(listener.family)
ov = _overlapped.Overlapped(NULL)
ov.AcceptEx(listener.fileno(), conn.fileno())
+
def finish_accept(trans, key, ov):
ov.getresult()
# Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
@@ -249,6 +256,7 @@
_overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
conn.settimeout(listener.gettimeout())
return conn, conn.getpeername()
+
return self._register(ov, listener, finish_accept)
def connect(self, conn, address):
@@ -264,26 +272,31 @@
raise
ov = _overlapped.Overlapped(NULL)
ov.ConnectEx(conn.fileno(), address)
+
def finish_connect(trans, key, ov):
ov.getresult()
# Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
conn.setsockopt(socket.SOL_SOCKET,
_overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
return conn
+
return self._register(ov, conn, finish_connect)
def accept_pipe(self, pipe):
self._register_with_iocp(pipe)
ov = _overlapped.Overlapped(NULL)
ov.ConnectNamedPipe(pipe.fileno())
+
def finish(trans, key, ov):
ov.getresult()
return pipe
+
return self._register(ov, pipe, finish)
def connect_pipe(self, address):
ov = _overlapped.Overlapped(NULL)
ov.WaitNamedPipeAndConnect(address, self._iocp, ov.address)
+
def finish(err, handle, ov):
# err, handle were arguments passed to PostQueuedCompletionStatus()
# in a function run in a thread pool.
@@ -296,6 +309,7 @@
raise OSError(0, msg, None, err)
else:
return windows_utils.PipeHandle(handle)
+
return self._register(ov, None, finish, wait_for_post=True)
def wait_for_handle(self, handle, timeout=None):
@@ -432,8 +446,10 @@
self._proc = windows_utils.Popen(
args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
bufsize=bufsize, **kwargs)
+
def callback(f):
returncode = self._proc.poll()
self._process_exited(returncode)
+
f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
f.add_done_callback(callback)
diff --git a/Lib/asyncio/windows_utils.py b/Lib/asyncio/windows_utils.py
--- a/Lib/asyncio/windows_utils.py
+++ b/Lib/asyncio/windows_utils.py
@@ -18,18 +18,18 @@
__all__ = ['socketpair', 'pipe', 'Popen', 'PIPE', 'PipeHandle']
-#
+
# Constants/globals
-#
+
BUFSIZE = 8192
PIPE = subprocess.PIPE
STDOUT = subprocess.STDOUT
_mmap_counter = itertools.count()
-#
+
# Replacement for socket.socketpair()
-#
+
def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0):
"""A socket pair usable as a self-pipe, for Windows.
@@ -57,9 +57,9 @@
lsock.close()
return (ssock, csock)
-#
+
# Replacement for os.pipe() using handles instead of fds
-#
+
def pipe(*, duplex=False, overlapped=(True, True), bufsize=BUFSIZE):
"""Like os.pipe() but with overlapped support and using handles not fds."""
@@ -105,9 +105,9 @@
_winapi.CloseHandle(h2)
raise
-#
+
# Wrapper for a pipe handle
-#
+
class PipeHandle:
"""Wrapper for an overlapped pipe handle which is vaguely file-object like.
@@ -137,9 +137,9 @@
def __exit__(self, t, v, tb):
self.close()
-#
+
# Replacement for subprocess.Popen using overlapped pipe handles
-#
+
class Popen(subprocess.Popen):
"""Replacement for subprocess.Popen using overlapped pipe handles.
diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py
--- a/Lib/test/test_asyncio/test_base_events.py
+++ b/Lib/test/test_asyncio/test_base_events.py
@@ -1,5 +1,6 @@
"""Tests for base_events.py"""
+import errno
import logging
import socket
import time
@@ -8,6 +9,7 @@
from test.support import find_unused_port, IPV6_ENABLED
from asyncio import base_events
+from asyncio import constants
from asyncio import events
from asyncio import futures
from asyncio import protocols
@@ -442,6 +444,71 @@
self.assertRaises(
OSError, self.loop.run_until_complete, coro)
+ def test_create_connection_ssl_server_hostname_default(self):
+ self.loop.getaddrinfo = unittest.mock.Mock()
+
+ def mock_getaddrinfo(*args, **kwds):
+ f = futures.Future(loop=self.loop)
+ f.set_result([(socket.AF_INET, socket.SOCK_STREAM,
+ socket.SOL_TCP, '', ('1.2.3.4', 80))])
+ return f
+
+ self.loop.getaddrinfo.side_effect = mock_getaddrinfo
+ self.loop.sock_connect = unittest.mock.Mock()
+ self.loop.sock_connect.return_value = ()
+ self.loop._make_ssl_transport = unittest.mock.Mock()
+
+ def mock_make_ssl_transport(sock, protocol, sslcontext, waiter,
+ **kwds):
+ waiter.set_result(None)
+
+ self.loop._make_ssl_transport.side_effect = mock_make_ssl_transport
+ ANY = unittest.mock.ANY
+ # First try the default server_hostname.
+ self.loop._make_ssl_transport.reset_mock()
+ coro = self.loop.create_connection(MyProto, 'python.org', 80, ssl=True)
+ self.loop.run_until_complete(coro)
+ self.loop._make_ssl_transport.assert_called_with(
+ ANY, ANY, ANY, ANY,
+ server_side=False,
+ server_hostname='python.org')
+ # Next try an explicit server_hostname.
+ self.loop._make_ssl_transport.reset_mock()
+ coro = self.loop.create_connection(MyProto, 'python.org', 80, ssl=True,
+ server_hostname='perl.com')
+ self.loop.run_until_complete(coro)
+ self.loop._make_ssl_transport.assert_called_with(
+ ANY, ANY, ANY, ANY,
+ server_side=False,
+ server_hostname='perl.com')
+ # Finally try an explicit empty server_hostname.
+ self.loop._make_ssl_transport.reset_mock()
+ coro = self.loop.create_connection(MyProto, 'python.org', 80, ssl=True,
+ server_hostname='')
+ self.loop.run_until_complete(coro)
+ self.loop._make_ssl_transport.assert_called_with(ANY, ANY, ANY, ANY,
+ server_side=False,
+ server_hostname='')
+
+ def test_create_connection_no_ssl_server_hostname_errors(self):
+ # When not using ssl, server_hostname must be None.
+ coro = self.loop.create_connection(MyProto, 'python.org', 80,
+ server_hostname='')
+ self.assertRaises(ValueError, self.loop.run_until_complete, coro)
+ coro = self.loop.create_connection(MyProto, 'python.org', 80,
+ server_hostname='python.org')
+ self.assertRaises(ValueError, self.loop.run_until_complete, coro)
+
+ def test_create_connection_ssl_server_hostname_errors(self):
+ # When using ssl, server_hostname may be None if host is non-empty.
+ coro = self.loop.create_connection(MyProto, '', 80, ssl=True)
+ self.assertRaises(ValueError, self.loop.run_until_complete, coro)
+ coro = self.loop.create_connection(MyProto, None, 80, ssl=True)
+ self.assertRaises(ValueError, self.loop.run_until_complete, coro)
+ coro = self.loop.create_connection(MyProto, None, None,
+ ssl=True, sock=socket.socket())
+ self.assertRaises(ValueError, self.loop.run_until_complete, coro)
+
def test_create_server_empty_host(self):
# if host is empty string use None instead
host = object()
@@ -585,11 +652,18 @@
def test_accept_connection_exception(self, m_log):
sock = unittest.mock.Mock()
sock.fileno.return_value = 10
- sock.accept.side_effect = OSError()
+ sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files')
+ self.loop.remove_reader = unittest.mock.Mock()
+ self.loop.call_later = unittest.mock.Mock()
self.loop._accept_connection(MyProto, sock)
- self.assertTrue(sock.close.called)
self.assertTrue(m_log.exception.called)
+ self.assertFalse(sock.close.called)
+ self.loop.remove_reader.assert_called_with(10)
+ self.loop.call_later.assert_called_with(constants.ACCEPT_RETRY_DELAY,
+ # self.loop._start_serving
+ unittest.mock.ANY,
+ MyProto, sock, None, None)
if __name__ == '__main__':
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -1276,7 +1276,6 @@
def create_event_loop(self):
return windows_events.SelectorEventLoop()
-
class ProactorEventLoopTests(EventLoopTestsMixin,
SubprocessTestsMixin,
unittest.TestCase):
@@ -1472,6 +1471,8 @@
self.assertRaises(
NotImplementedError, loop.is_running)
self.assertRaises(
+ NotImplementedError, loop.close)
+ self.assertRaises(
NotImplementedError, loop.call_later, None, None)
self.assertRaises(
NotImplementedError, loop.call_at, f, f)
diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py
--- a/Lib/test/test_asyncio/test_selector_events.py
+++ b/Lib/test/test_asyncio/test_selector_events.py
@@ -43,6 +43,7 @@
self.assertIsInstance(
self.loop._make_socket_transport(m, m), _SelectorSocketTransport)
+ @unittest.skipIf(ssl is None, 'No ssl module')
def test_make_ssl_transport(self):
m = unittest.mock.Mock()
self.loop.add_reader = unittest.mock.Mock()
@@ -52,6 +53,16 @@
self.assertIsInstance(
self.loop._make_ssl_transport(m, m, m, m), _SelectorSslTransport)
+ @unittest.mock.patch('asyncio.selector_events.ssl', None)
+ def test_make_ssl_transport_without_ssl_error(self):
+ m = unittest.mock.Mock()
+ self.loop.add_reader = unittest.mock.Mock()
+ self.loop.add_writer = unittest.mock.Mock()
+ self.loop.remove_reader = unittest.mock.Mock()
+ self.loop.remove_writer = unittest.mock.Mock()
+ with self.assertRaises(RuntimeError):
+ self.loop._make_ssl_transport(m, m, m, m)
+
def test_close(self):
ssock = self.loop._ssock
ssock.fileno.return_value = 7
@@ -1003,8 +1014,7 @@
self.loop, self.sock, self.protocol, self.sslcontext,
waiter=waiter)
self.assertTrue(self.sslsock.do_handshake.called)
- self.loop.assert_reader(1, tr._on_ready)
- self.loop.assert_writer(1, tr._on_ready)
+ self.loop.assert_reader(1, tr._read_ready)
test_utils.run_briefly(self.loop)
self.assertIsNone(waiter.result())
@@ -1047,13 +1057,13 @@
def test_pause_resume_reading(self):
tr = self._make_one()
self.assertFalse(tr._paused)
- self.loop.assert_reader(1, tr._on_ready)
+ self.loop.assert_reader(1, tr._read_ready)
tr.pause_reading()
self.assertTrue(tr._paused)
self.assertFalse(1 in self.loop.readers)
tr.resume_reading()
self.assertFalse(tr._paused)
- self.loop.assert_reader(1, tr._on_ready)
+ self.loop.assert_reader(1, tr._read_ready)
def test_write_no_data(self):
transport = self._make_one()
@@ -1084,140 +1094,173 @@
transport.write(b'data')
m_log.warning.assert_called_with('socket.send() raised exception.')
- def test_on_ready_recv(self):
+ def test_read_ready_recv(self):
self.sslsock.recv.return_value = b'data'
transport = self._make_one()
- transport._on_ready()
+ transport._read_ready()
self.assertTrue(self.sslsock.recv.called)
self.assertEqual((b'data',), self.protocol.data_received.call_args[0])
- def test_on_ready_recv_eof(self):
+ def test_read_ready_write_wants_read(self):
+ self.loop.add_writer = unittest.mock.Mock()
+ self.sslsock.recv.side_effect = BlockingIOError
+ transport = self._make_one()
+ transport._write_wants_read = True
+ transport._write_ready = unittest.mock.Mock()
+ transport._buffer.append(b'data')
+ transport._read_ready()
+
+ self.assertFalse(transport._write_wants_read)
+ transport._write_ready.assert_called_with()
+ self.loop.add_writer.assert_called_with(
+ transport._sock_fd, transport._write_ready)
+
+ def test_read_ready_recv_eof(self):
self.sslsock.recv.return_value = b''
transport = self._make_one()
transport.close = unittest.mock.Mock()
- transport._on_ready()
+ transport._read_ready()
transport.close.assert_called_with()
self.protocol.eof_received.assert_called_with()
- def test_on_ready_recv_conn_reset(self):
+ def test_read_ready_recv_conn_reset(self):
err = self.sslsock.recv.side_effect = ConnectionResetError()
transport = self._make_one()
transport._force_close = unittest.mock.Mock()
- transport._on_ready()
+ transport._read_ready()
transport._force_close.assert_called_with(err)
- def test_on_ready_recv_retry(self):
+ def test_read_ready_recv_retry(self):
self.sslsock.recv.side_effect = ssl.SSLWantReadError
transport = self._make_one()
- transport._on_ready()
+ transport._read_ready()
self.assertTrue(self.sslsock.recv.called)
self.assertFalse(self.protocol.data_received.called)
- self.sslsock.recv.side_effect = ssl.SSLWantWriteError
- transport._on_ready()
- self.assertFalse(self.protocol.data_received.called)
-
self.sslsock.recv.side_effect = BlockingIOError
- transport._on_ready()
+ transport._read_ready()
self.assertFalse(self.protocol.data_received.called)
self.sslsock.recv.side_effect = InterruptedError
- transport._on_ready()
+ transport._read_ready()
self.assertFalse(self.protocol.data_received.called)
- def test_on_ready_recv_exc(self):
+ def test_read_ready_recv_write(self):
+ self.loop.remove_reader = unittest.mock.Mock()
+ self.loop.add_writer = unittest.mock.Mock()
+ self.sslsock.recv.side_effect = ssl.SSLWantWriteError
+ transport = self._make_one()
+ transport._read_ready()
+ self.assertFalse(self.protocol.data_received.called)
+ self.assertTrue(transport._read_wants_write)
+
+ self.loop.remove_reader.assert_called_with(transport._sock_fd)
+ self.loop.add_writer.assert_called_with(
+ transport._sock_fd, transport._write_ready)
+
+ def test_read_ready_recv_exc(self):
err = self.sslsock.recv.side_effect = OSError()
transport = self._make_one()
transport._fatal_error = unittest.mock.Mock()
- transport._on_ready()
+ transport._read_ready()
transport._fatal_error.assert_called_with(err)
- def test_on_ready_send(self):
- self.sslsock.recv.side_effect = ssl.SSLWantReadError
+ def test_write_ready_send(self):
self.sslsock.send.return_value = 4
transport = self._make_one()
transport._buffer = collections.deque([b'data'])
- transport._on_ready()
+ transport._write_ready()
self.assertEqual(collections.deque(), transport._buffer)
self.assertTrue(self.sslsock.send.called)
- def test_on_ready_send_none(self):
- self.sslsock.recv.side_effect = ssl.SSLWantReadError
+ def test_write_ready_send_none(self):
self.sslsock.send.return_value = 0
transport = self._make_one()
transport._buffer = collections.deque([b'data1', b'data2'])
- transport._on_ready()
+ transport._write_ready()
self.assertTrue(self.sslsock.send.called)
self.assertEqual(collections.deque([b'data1data2']), transport._buffer)
- def test_on_ready_send_partial(self):
- self.sslsock.recv.side_effect = ssl.SSLWantReadError
+ def test_write_ready_send_partial(self):
self.sslsock.send.return_value = 2
transport = self._make_one()
transport._buffer = collections.deque([b'data1', b'data2'])
- transport._on_ready()
+ transport._write_ready()
self.assertTrue(self.sslsock.send.called)
self.assertEqual(collections.deque([b'ta1data2']), transport._buffer)
- def test_on_ready_send_closing_partial(self):
- self.sslsock.recv.side_effect = ssl.SSLWantReadError
+ def test_write_ready_send_closing_partial(self):
self.sslsock.send.return_value = 2
transport = self._make_one()
transport._buffer = collections.deque([b'data1', b'data2'])
- transport._on_ready()
+ transport._write_ready()
self.assertTrue(self.sslsock.send.called)
self.assertFalse(self.sslsock.close.called)
- def test_on_ready_send_closing(self):
- self.sslsock.recv.side_effect = ssl.SSLWantReadError
+ def test_write_ready_send_closing(self):
self.sslsock.send.return_value = 4
transport = self._make_one()
transport.close()
transport._buffer = collections.deque([b'data'])
- transport._on_ready()
+ transport._write_ready()
self.assertFalse(self.loop.writers)
self.protocol.connection_lost.assert_called_with(None)
- def test_on_ready_send_closing_empty_buffer(self):
- self.sslsock.recv.side_effect = ssl.SSLWantReadError
+ def test_write_ready_send_closing_empty_buffer(self):
self.sslsock.send.return_value = 4
transport = self._make_one()
transport.close()
transport._buffer = collections.deque()
- transport._on_ready()
+ transport._write_ready()
self.assertFalse(self.loop.writers)
self.protocol.connection_lost.assert_called_with(None)
- def test_on_ready_send_retry(self):
- self.sslsock.recv.side_effect = ssl.SSLWantReadError
-
+ def test_write_ready_send_retry(self):
transport = self._make_one()
transport._buffer = collections.deque([b'data'])
- self.sslsock.send.side_effect = ssl.SSLWantReadError
- transport._on_ready()
- self.assertTrue(self.sslsock.send.called)
- self.assertEqual(collections.deque([b'data']), transport._buffer)
-
self.sslsock.send.side_effect = ssl.SSLWantWriteError
- transport._on_ready()
+ transport._write_ready()
self.assertEqual(collections.deque([b'data']), transport._buffer)
self.sslsock.send.side_effect = BlockingIOError()
- transport._on_ready()
+ transport._write_ready()
self.assertEqual(collections.deque([b'data']), transport._buffer)
- def test_on_ready_send_exc(self):
- self.sslsock.recv.side_effect = ssl.SSLWantReadError
+ def test_write_ready_send_read(self):
+ transport = self._make_one()
+ transport._buffer = collections.deque([b'data'])
+
+ self.loop.remove_writer = unittest.mock.Mock()
+ self.sslsock.send.side_effect = ssl.SSLWantReadError
+ transport._write_ready()
+ self.assertFalse(self.protocol.data_received.called)
+ self.assertTrue(transport._write_wants_read)
+ self.loop.remove_writer.assert_called_with(transport._sock_fd)
+
+ def test_write_ready_send_exc(self):
err = self.sslsock.send.side_effect = OSError()
transport = self._make_one()
transport._buffer = collections.deque([b'data'])
transport._fatal_error = unittest.mock.Mock()
- transport._on_ready()
+ transport._write_ready()
transport._fatal_error.assert_called_with(err)
self.assertEqual(collections.deque(), transport._buffer)
+ def test_write_ready_read_wants_write(self):
+ self.loop.add_reader = unittest.mock.Mock()
+ self.sslsock.send.side_effect = BlockingIOError
+ transport = self._make_one()
+ transport._read_wants_write = True
+ transport._read_ready = unittest.mock.Mock()
+ transport._write_ready()
+
+ self.assertFalse(transport._read_wants_write)
+ transport._read_ready.assert_called_with()
+ self.loop.add_reader.assert_called_with(
+ transport._sock_fd, transport._read_ready)
+
def test_write_eof(self):
tr = self._make_one()
self.assertFalse(tr.can_write_eof())
@@ -1245,6 +1288,15 @@
server_hostname='localhost')
+class SelectorSslWithoutSslTransportTests(unittest.TestCase):
+
+ @unittest.mock.patch('asyncio.selector_events.ssl', None)
+ def test_ssl_transport_requires_ssl_module(self):
+ Mock = unittest.mock.Mock
+ with self.assertRaises(RuntimeError):
+ transport = _SelectorSslTransport(Mock(), Mock(), Mock(), Mock())
+
+
class SelectorDatagramTransportTests(unittest.TestCase):
def setUp(self):
diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py
--- a/Lib/test/test_asyncio/test_windows_events.py
+++ b/Lib/test/test_asyncio/test_windows_events.py
@@ -77,7 +77,7 @@
stream_reader = streams.StreamReader(loop=self.loop)
protocol = streams.StreamReaderProtocol(stream_reader)
trans, proto = yield from self.loop.create_pipe_connection(
- lambda:protocol, ADDRESS)
+ lambda: protocol, ADDRESS)
self.assertIsInstance(trans, transports.Transport)
self.assertEqual(protocol, proto)
clients.append((stream_reader, trans))
--
Repository URL: http://hg.python.org/cpython
More information about the Python-checkins
mailing list