[pypy-commit] pypy vendor/stdlib-3.5: Import stdlib from CPython 3.5.3 (changeset 'v3.5.3')
arigo
pypy.commits at gmail.com
Sun Feb 5 13:44:18 EST 2017
Author: Armin Rigo <arigo at tunes.org>
Branch: vendor/stdlib-3.5
Changeset: r89950:a9167602e69c
Date: 2017-02-05 19:12 +0100
http://bitbucket.org/pypy/pypy/changeset/a9167602e69c/
Log: Import stdlib from CPython 3.5.3 (changeset 'v3.5.3')
diff too long, truncating to 2000 out of 24911 lines
diff --git a/lib-python/3/_collections_abc.py b/lib-python/3/_collections_abc.py
--- a/lib-python/3/_collections_abc.py
+++ b/lib-python/3/_collections_abc.py
@@ -29,8 +29,8 @@
# so that they will pass tests like:
# it = iter(somebytearray)
# assert isinstance(it, Iterable)
-# Note: in other implementations, these types many not be distinct
-# and they make have their own implementation specific types that
+# Note: in other implementations, these types might not be distinct
+# and they may have their own implementation specific types that
# are not included on this list.
bytes_iterator = type(iter(b''))
bytearray_iterator = type(iter(bytearray()))
@@ -41,6 +41,7 @@
list_iterator = type(iter([]))
list_reverseiterator = type(iter(reversed([])))
range_iterator = type(iter(range(0)))
+longrange_iterator = type(iter(range(1 << 1000)))
set_iterator = type(iter(set()))
str_iterator = type(iter(""))
tuple_iterator = type(iter(()))
@@ -234,6 +235,7 @@
Iterator.register(list_iterator)
Iterator.register(list_reverseiterator)
Iterator.register(range_iterator)
+Iterator.register(longrange_iterator)
Iterator.register(set_iterator)
Iterator.register(str_iterator)
Iterator.register(tuple_iterator)
diff --git a/lib-python/3/_pydecimal.py b/lib-python/3/_pydecimal.py
--- a/lib-python/3/_pydecimal.py
+++ b/lib-python/3/_pydecimal.py
@@ -1068,12 +1068,11 @@
return sign + intpart + fracpart + exp
def to_eng_string(self, context=None):
- """Convert to engineering-type string.
-
- Engineering notation has an exponent which is a multiple of 3, so there
- are up to 3 digits left of the decimal place.
-
- Same rules for when in exponential and when as a value as in __str__.
+ """Convert to a string, using engineering notation if an exponent is needed.
+
+ Engineering notation has an exponent which is a multiple of 3. This
+ can leave up to 3 digits to the left of the decimal place and may
+ require the addition of either one or two trailing zeros.
"""
return self.__str__(eng=True, context=context)
@@ -4107,7 +4106,7 @@
>>> context.create_decimal_from_float(3.1415926535897932)
Traceback (most recent call last):
...
- decimal.Inexact
+ decimal.Inexact: None
"""
d = Decimal.from_float(f) # An exact conversion
@@ -5502,9 +5501,29 @@
return r
def to_eng_string(self, a):
- """Converts a number to a string, using scientific notation.
+ """Convert to a string, using engineering notation if an exponent is needed.
+
+ Engineering notation has an exponent which is a multiple of 3. This
+ can leave up to 3 digits to the left of the decimal place and may
+ require the addition of either one or two trailing zeros.
The operation is not affected by the context.
+
+ >>> ExtendedContext.to_eng_string(Decimal('123E+1'))
+ '1.23E+3'
+ >>> ExtendedContext.to_eng_string(Decimal('123E+3'))
+ '123E+3'
+ >>> ExtendedContext.to_eng_string(Decimal('123E-10'))
+ '12.3E-9'
+ >>> ExtendedContext.to_eng_string(Decimal('-123E-12'))
+ '-123E-12'
+ >>> ExtendedContext.to_eng_string(Decimal('7E-7'))
+ '700E-9'
+ >>> ExtendedContext.to_eng_string(Decimal('7E+1'))
+ '70'
+ >>> ExtendedContext.to_eng_string(Decimal('0E+1'))
+ '0.00E+3'
+
"""
a = _convert_other(a, raiseit=True)
return a.to_eng_string(context=self)
diff --git a/lib-python/3/_pyio.py b/lib-python/3/_pyio.py
--- a/lib-python/3/_pyio.py
+++ b/lib-python/3/_pyio.py
@@ -276,7 +276,7 @@
try:
UnsupportedOperation = io.UnsupportedOperation
except AttributeError:
- class UnsupportedOperation(ValueError, OSError):
+ class UnsupportedOperation(OSError, ValueError):
pass
diff --git a/lib-python/3/antigravity.py b/lib-python/3/antigravity.py
--- a/lib-python/3/antigravity.py
+++ b/lib-python/3/antigravity.py
@@ -2,7 +2,7 @@
import webbrowser
import hashlib
-webbrowser.open("http://xkcd.com/353/")
+webbrowser.open("https://xkcd.com/353/")
def geohash(latitude, longitude, datedow):
'''Compute geohash() using the Munroe algorithm.
diff --git a/lib-python/3/asyncio/base_events.py b/lib-python/3/asyncio/base_events.py
--- a/lib-python/3/asyncio/base_events.py
+++ b/lib-python/3/asyncio/base_events.py
@@ -13,7 +13,6 @@
to modify the meaning of the API call itself.
"""
-
import collections
import concurrent.futures
import heapq
@@ -28,6 +27,7 @@
import traceback
import sys
import warnings
+import weakref
from . import compat
from . import coroutines
@@ -41,9 +41,6 @@
__all__ = ['BaseEventLoop']
-# Argument for default thread pool executor creation.
-_MAX_WORKERS = 5
-
# Minimum number of _scheduled timer handles before cleanup of
# cancelled handles is performed.
_MIN_SCHEDULED_TIMER_HANDLES = 100
@@ -76,12 +73,29 @@
return repr(fd)
-# Linux's sock.type is a bitmask that can include extra info about socket.
-_SOCKET_TYPE_MASK = 0
-if hasattr(socket, 'SOCK_NONBLOCK'):
- _SOCKET_TYPE_MASK |= socket.SOCK_NONBLOCK
-if hasattr(socket, 'SOCK_CLOEXEC'):
- _SOCKET_TYPE_MASK |= socket.SOCK_CLOEXEC
+def _set_reuseport(sock):
+ if not hasattr(socket, 'SO_REUSEPORT'):
+ raise ValueError('reuse_port not supported by socket module')
+ else:
+ try:
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+ except OSError:
+ raise ValueError('reuse_port not supported by socket module, '
+ 'SO_REUSEPORT defined but not implemented.')
+
+
+def _is_stream_socket(sock):
+ # Linux's socket.type is a bitmask that can include extra info
+ # about socket, therefore we can't do simple
+ # `sock_type == socket.SOCK_STREAM`.
+ return (sock.type & socket.SOCK_STREAM) == socket.SOCK_STREAM
+
+
+def _is_dgram_socket(sock):
+ # Linux's socket.type is a bitmask that can include extra info
+ # about socket, therefore we can't do simple
+ # `sock_type == socket.SOCK_DGRAM`.
+ return (sock.type & socket.SOCK_DGRAM) == socket.SOCK_DGRAM
def _ipaddr_info(host, port, family, type, proto):
@@ -94,8 +108,12 @@
host is None:
return None
- type &= ~_SOCKET_TYPE_MASK
if type == socket.SOCK_STREAM:
+ # Linux only:
+ # getaddrinfo() can raise when socket.type is a bit mask.
+ # So if socket.type is a bit mask of SOCK_STREAM, and say
+ # SOCK_NONBLOCK, we simply return None, which will trigger
+ # a call to getaddrinfo() letting it process this request.
proto = socket.IPPROTO_TCP
elif type == socket.SOCK_DGRAM:
proto = socket.IPPROTO_UDP
@@ -104,27 +122,21 @@
if port is None:
port = 0
- elif isinstance(port, bytes):
- if port == b'':
- port = 0
- else:
- try:
- port = int(port)
- except ValueError:
- # Might be a service name like b"http".
- port = socket.getservbyname(port.decode('ascii'))
- elif isinstance(port, str):
- if port == '':
- port = 0
- else:
- try:
- port = int(port)
- except ValueError:
- # Might be a service name like "http".
- port = socket.getservbyname(port)
+ elif isinstance(port, bytes) and port == b'':
+ port = 0
+ elif isinstance(port, str) and port == '':
+ port = 0
+ else:
+ # If port's a service name like "http", don't skip getaddrinfo.
+ try:
+ port = int(port)
+ except (TypeError, ValueError):
+ return None
if family == socket.AF_UNSPEC:
- afs = [socket.AF_INET, socket.AF_INET6]
+ afs = [socket.AF_INET]
+ if hasattr(socket, 'AF_INET6'):
+ afs.append(socket.AF_INET6)
else:
afs = [family]
@@ -242,6 +254,17 @@
self._task_factory = None
self._coroutine_wrapper_set = False
+ if hasattr(sys, 'get_asyncgen_hooks'):
+ # Python >= 3.6
+ # A weak set of all asynchronous generators that are
+ # being iterated by the loop.
+ self._asyncgens = weakref.WeakSet()
+ else:
+ self._asyncgens = None
+
+ # Set to True when `loop.shutdown_asyncgens` is called.
+ self._asyncgens_shutdown_called = False
+
def __repr__(self):
return ('<%s running=%s closed=%s debug=%s>'
% (self.__class__.__name__, self.is_running(),
@@ -333,14 +356,67 @@
if self._closed:
raise RuntimeError('Event loop is closed')
+ def _asyncgen_finalizer_hook(self, agen):
+ self._asyncgens.discard(agen)
+ if not self.is_closed():
+ self.create_task(agen.aclose())
+ # Wake up the loop if the finalizer was called from
+ # a different thread.
+ self._write_to_self()
+
+ def _asyncgen_firstiter_hook(self, agen):
+ if self._asyncgens_shutdown_called:
+ warnings.warn(
+ "asynchronous generator {!r} was scheduled after "
+ "loop.shutdown_asyncgens() call".format(agen),
+ ResourceWarning, source=self)
+
+ self._asyncgens.add(agen)
+
+ @coroutine
+ def shutdown_asyncgens(self):
+ """Shutdown all active asynchronous generators."""
+ self._asyncgens_shutdown_called = True
+
+ if self._asyncgens is None or not len(self._asyncgens):
+ # If Python version is <3.6 or we don't have any asynchronous
+ # generators alive.
+ return
+
+ closing_agens = list(self._asyncgens)
+ self._asyncgens.clear()
+
+ shutdown_coro = tasks.gather(
+ *[ag.aclose() for ag in closing_agens],
+ return_exceptions=True,
+ loop=self)
+
+ results = yield from shutdown_coro
+ for result, agen in zip(results, closing_agens):
+ if isinstance(result, Exception):
+ self.call_exception_handler({
+ 'message': 'an error occurred during closing of '
+ 'asynchronous generator {!r}'.format(agen),
+ 'exception': result,
+ 'asyncgen': agen
+ })
+
def run_forever(self):
"""Run until stop() is called."""
self._check_closed()
if self.is_running():
- raise RuntimeError('Event loop is running.')
+ raise RuntimeError('This event loop is already running')
+ if events._get_running_loop() is not None:
+ raise RuntimeError(
+ 'Cannot run the event loop while another loop is running')
self._set_coroutine_wrapper(self._debug)
self._thread_id = threading.get_ident()
+ if self._asyncgens is not None:
+ old_agen_hooks = sys.get_asyncgen_hooks()
+ sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
+ finalizer=self._asyncgen_finalizer_hook)
try:
+ events._set_running_loop(self)
while True:
self._run_once()
if self._stopping:
@@ -348,7 +424,10 @@
finally:
self._stopping = False
self._thread_id = None
+ events._set_running_loop(None)
self._set_coroutine_wrapper(False)
+ if self._asyncgens is not None:
+ sys.set_asyncgen_hooks(*old_agen_hooks)
def run_until_complete(self, future):
"""Run until the Future is done.
@@ -363,7 +442,7 @@
"""
self._check_closed()
- new_task = not isinstance(future, futures.Future)
+ new_task = not futures.isfuture(future)
future = tasks.ensure_future(future, loop=self)
if new_task:
# An exception is raised if the future didn't complete, so there
@@ -469,12 +548,10 @@
Absolute time corresponds to the event loop's time() method.
"""
- if (coroutines.iscoroutine(callback)
- or coroutines.iscoroutinefunction(callback)):
- raise TypeError("coroutines cannot be used with call_at()")
self._check_closed()
if self._debug:
self._check_thread()
+ self._check_callback(callback, 'call_at')
timer = events.TimerHandle(when, callback, args, self)
if timer._source_traceback:
del timer._source_traceback[-1]
@@ -492,18 +569,27 @@
Any positional arguments after the callback will be passed to
the callback when it is called.
"""
+ self._check_closed()
if self._debug:
self._check_thread()
+ self._check_callback(callback, 'call_soon')
handle = self._call_soon(callback, args)
if handle._source_traceback:
del handle._source_traceback[-1]
return handle
+ def _check_callback(self, callback, method):
+ if (coroutines.iscoroutine(callback) or
+ coroutines.iscoroutinefunction(callback)):
+ raise TypeError(
+ "coroutines cannot be used with {}()".format(method))
+ if not callable(callback):
+ raise TypeError(
+ 'a callable object was expected by {}(), got {!r}'.format(
+ method, callback))
+
+
def _call_soon(self, callback, args):
- if (coroutines.iscoroutine(callback)
- or coroutines.iscoroutinefunction(callback)):
- raise TypeError("coroutines cannot be used with call_soon()")
- self._check_closed()
handle = events.Handle(callback, args, self)
if handle._source_traceback:
del handle._source_traceback[-1]
@@ -529,6 +615,9 @@
def call_soon_threadsafe(self, callback, *args):
"""Like call_soon(), but thread-safe."""
+ self._check_closed()
+ if self._debug:
+ self._check_callback(callback, 'call_soon_threadsafe')
handle = self._call_soon(callback, args)
if handle._source_traceback:
del handle._source_traceback[-1]
@@ -536,22 +625,13 @@
return handle
def run_in_executor(self, executor, func, *args):
- if (coroutines.iscoroutine(func)
- or coroutines.iscoroutinefunction(func)):
- raise TypeError("coroutines cannot be used with run_in_executor()")
self._check_closed()
- if isinstance(func, events.Handle):
- assert not args
- assert not isinstance(func, events.TimerHandle)
- if func._cancelled:
- f = self.create_future()
- f.set_result(None)
- return f
- func, args = func._callback, func._args
+ if self._debug:
+ self._check_callback(func, 'run_in_executor')
if executor is None:
executor = self._default_executor
if executor is None:
- executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
+ executor = concurrent.futures.ThreadPoolExecutor()
self._default_executor = executor
return futures.wrap_future(executor.submit(func, *args), loop=self)
@@ -703,11 +783,19 @@
raise OSError('Multiple exceptions: {}'.format(
', '.join(str(exc) for exc in exceptions)))
- elif sock is None:
- raise ValueError(
- 'host and port was not specified and no sock specified')
-
- sock.setblocking(False)
+ else:
+ if sock is None:
+ raise ValueError(
+ 'host and port was not specified and no sock specified')
+ if not _is_stream_socket(sock):
+ # We allow AF_INET, AF_INET6, AF_UNIX as long as they
+ # are SOCK_STREAM.
+ # We support passing AF_UNIX sockets even though we have
+ # a dedicated API for that: create_unix_connection.
+ # Disallowing AF_UNIX in this method, breaks backwards
+ # compatibility.
+ raise ValueError(
+ 'A Stream Socket was expected, got {!r}'.format(sock))
transport, protocol = yield from self._create_connection_transport(
sock, protocol_factory, ssl, server_hostname)
@@ -721,14 +809,17 @@
@coroutine
def _create_connection_transport(self, sock, protocol_factory, ssl,
- server_hostname):
+ server_hostname, server_side=False):
+
+ sock.setblocking(False)
+
protocol = protocol_factory()
waiter = self.create_future()
if ssl:
sslcontext = None if isinstance(ssl, bool) else ssl
transport = self._make_ssl_transport(
sock, protocol, sslcontext, waiter,
- server_side=False, server_hostname=server_hostname)
+ server_side=server_side, server_hostname=server_hostname)
else:
transport = self._make_socket_transport(sock, protocol, waiter)
@@ -748,6 +839,9 @@
allow_broadcast=None, sock=None):
"""Create datagram connection."""
if sock is not None:
+ if not _is_dgram_socket(sock):
+ raise ValueError(
+ 'A UDP Socket was expected, got {!r}'.format(sock))
if (local_addr or remote_addr or
family or proto or flags or
reuse_address or reuse_port or allow_broadcast):
@@ -813,12 +907,7 @@
sock.setsockopt(
socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if reuse_port:
- if not hasattr(socket, 'SO_REUSEPORT'):
- raise ValueError(
- 'reuse_port not supported by socket module')
- else:
- sock.setsockopt(
- socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+ _set_reuseport(sock)
if allow_broadcast:
sock.setsockopt(
socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
@@ -941,12 +1030,7 @@
sock.setsockopt(
socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
if reuse_port:
- if not hasattr(socket, 'SO_REUSEPORT'):
- raise ValueError(
- 'reuse_port not supported by socket module')
- else:
- sock.setsockopt(
- socket.SOL_SOCKET, socket.SO_REUSEPORT, True)
+ _set_reuseport(sock)
# Disable IPv4/IPv6 dual stack support (enabled by
# default on Linux) which makes a single socket
# listen on both address families.
@@ -968,18 +1052,44 @@
else:
if sock is None:
raise ValueError('Neither host/port nor sock were specified')
+ if not _is_stream_socket(sock):
+ raise ValueError(
+ 'A Stream Socket was expected, got {!r}'.format(sock))
sockets = [sock]
server = Server(self, sockets)
for sock in sockets:
sock.listen(backlog)
sock.setblocking(False)
- self._start_serving(protocol_factory, sock, ssl, server)
+ self._start_serving(protocol_factory, sock, ssl, server, backlog)
if self._debug:
logger.info("%r is serving", server)
return server
@coroutine
+ def connect_accepted_socket(self, protocol_factory, sock, *, ssl=None):
+ """Handle an accepted connection.
+
+ This is used by servers that accept connections outside of
+ asyncio but that use asyncio to handle connections.
+
+ This method is a coroutine. When completed, the coroutine
+ returns a (transport, protocol) pair.
+ """
+ if not _is_stream_socket(sock):
+ raise ValueError(
+ 'A Stream Socket was expected, got {!r}'.format(sock))
+
+ transport, protocol = yield from self._create_connection_transport(
+ sock, protocol_factory, ssl, '', server_side=True)
+ if self._debug:
+ # Get the socket from the transport because SSL transport closes
+ # the old socket and creates a new SSL socket
+ sock = transport.get_extra_info('socket')
+ logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
+ return transport, protocol
+
+ @coroutine
def connect_read_pipe(self, protocol_factory, pipe):
protocol = protocol_factory()
waiter = self.create_future()
@@ -1048,7 +1158,7 @@
transport = yield from self._make_subprocess_transport(
protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
if self._debug:
- logger.info('%s: %r' % (debug_log, transport))
+ logger.info('%s: %r', debug_log, transport)
return transport, protocol
@coroutine
@@ -1078,7 +1188,7 @@
protocol, popen_args, False, stdin, stdout, stderr,
bufsize, **kwargs)
if self._debug:
- logger.info('%s: %r' % (debug_log, transport))
+ logger.info('%s: %r', debug_log, transport)
return transport, protocol
def get_exception_handler(self):
@@ -1158,7 +1268,9 @@
- 'handle' (optional): Handle instance;
- 'protocol' (optional): Protocol instance;
- 'transport' (optional): Transport instance;
- - 'socket' (optional): Socket instance.
+ - 'socket' (optional): Socket instance;
+ - 'asyncgen' (optional): Asynchronous generator that caused
+ the exception.
New keys maybe introduced in the future.
diff --git a/lib-python/3/asyncio/base_subprocess.py b/lib-python/3/asyncio/base_subprocess.py
--- a/lib-python/3/asyncio/base_subprocess.py
+++ b/lib-python/3/asyncio/base_subprocess.py
@@ -3,7 +3,6 @@
import warnings
from . import compat
-from . import futures
from . import protocols
from . import transports
from .coroutines import coroutine
@@ -87,6 +86,12 @@
def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
raise NotImplementedError
+ def set_protocol(self, protocol):
+ self._protocol = protocol
+
+ def get_protocol(self):
+ return self._protocol
+
def is_closing(self):
return self._closed
diff --git a/lib-python/3/asyncio/coroutines.py b/lib-python/3/asyncio/coroutines.py
--- a/lib-python/3/asyncio/coroutines.py
+++ b/lib-python/3/asyncio/coroutines.py
@@ -33,12 +33,16 @@
try:
_types_coroutine = types.coroutine
+ _types_CoroutineType = types.CoroutineType
except AttributeError:
+ # Python 3.4
_types_coroutine = None
+ _types_CoroutineType = None
try:
_inspect_iscoroutinefunction = inspect.iscoroutinefunction
except AttributeError:
+ # Python 3.4
_inspect_iscoroutinefunction = lambda func: False
try:
@@ -120,8 +124,8 @@
def send(self, value):
return self.gen.send(value)
- def throw(self, exc):
- return self.gen.throw(exc)
+ def throw(self, type, value=None, traceback=None):
+ return self.gen.throw(type, value, traceback)
def close(self):
return self.gen.close()
@@ -204,8 +208,8 @@
@functools.wraps(func)
def coro(*args, **kw):
res = func(*args, **kw)
- if isinstance(res, futures.Future) or inspect.isgenerator(res) or \
- isinstance(res, CoroWrapper):
+ if (futures.isfuture(res) or inspect.isgenerator(res) or
+ isinstance(res, CoroWrapper)):
res = yield from res
elif _AwaitableABC is not None:
# If 'func' returns an Awaitable (new in 3.5) we
@@ -238,19 +242,27 @@
w.__qualname__ = getattr(func, '__qualname__', None)
return w
- wrapper._is_coroutine = True # For iscoroutinefunction().
+ wrapper._is_coroutine = _is_coroutine # For iscoroutinefunction().
return wrapper
+# A marker for iscoroutinefunction.
+_is_coroutine = object()
+
+
def iscoroutinefunction(func):
"""Return True if func is a decorated coroutine function."""
- return (getattr(func, '_is_coroutine', False) or
+ return (getattr(func, '_is_coroutine', None) is _is_coroutine or
_inspect_iscoroutinefunction(func))
_COROUTINE_TYPES = (types.GeneratorType, CoroWrapper)
if _CoroutineABC is not None:
_COROUTINE_TYPES += (_CoroutineABC,)
+if _types_CoroutineType is not None:
+ # Prioritize native coroutine check to speed-up
+ # asyncio.iscoroutine.
+ _COROUTINE_TYPES = (_types_CoroutineType,) + _COROUTINE_TYPES
def iscoroutine(obj):
@@ -261,6 +273,29 @@
def _format_coroutine(coro):
assert iscoroutine(coro)
+ if not hasattr(coro, 'cr_code') and not hasattr(coro, 'gi_code'):
+ # Most likely a built-in type or a Cython coroutine.
+
+ # Built-in types might not have __qualname__ or __name__.
+ coro_name = getattr(
+ coro, '__qualname__',
+ getattr(coro, '__name__', type(coro).__name__))
+ coro_name = '{}()'.format(coro_name)
+
+ running = False
+ try:
+ running = coro.cr_running
+ except AttributeError:
+ try:
+ running = coro.gi_running
+ except AttributeError:
+ pass
+
+ if running:
+ return '{} running'.format(coro_name)
+ else:
+ return coro_name
+
coro_name = None
if isinstance(coro, CoroWrapper):
func = coro.func
@@ -271,7 +306,7 @@
func = coro
if coro_name is None:
- coro_name = events._format_callback(func, ())
+ coro_name = events._format_callback(func, (), {})
try:
coro_code = coro.gi_code
diff --git a/lib-python/3/asyncio/events.py b/lib-python/3/asyncio/events.py
--- a/lib-python/3/asyncio/events.py
+++ b/lib-python/3/asyncio/events.py
@@ -6,6 +6,7 @@
'get_event_loop_policy', 'set_event_loop_policy',
'get_event_loop', 'set_event_loop', 'new_event_loop',
'get_child_watcher', 'set_child_watcher',
+ '_set_running_loop', '_get_running_loop',
]
import functools
@@ -35,23 +36,25 @@
return None
-def _format_args(args):
- """Format function arguments.
+def _format_args_and_kwargs(args, kwargs):
+ """Format function arguments and keyword arguments.
Special case for a single parameter: ('hello',) is formatted as ('hello').
"""
# use reprlib to limit the length of the output
- args_repr = reprlib.repr(args)
- if len(args) == 1 and args_repr.endswith(',)'):
- args_repr = args_repr[:-2] + ')'
- return args_repr
+ items = []
+ if args:
+ items.extend(reprlib.repr(arg) for arg in args)
+ if kwargs:
+ items.extend('{}={}'.format(k, reprlib.repr(v))
+ for k, v in kwargs.items())
+ return '(' + ', '.join(items) + ')'
-def _format_callback(func, args, suffix=''):
+def _format_callback(func, args, kwargs, suffix=''):
if isinstance(func, functools.partial):
- if args is not None:
- suffix = _format_args(args) + suffix
- return _format_callback(func.func, func.args, suffix)
+ suffix = _format_args_and_kwargs(args, kwargs) + suffix
+ return _format_callback(func.func, func.args, func.keywords, suffix)
if hasattr(func, '__qualname__'):
func_repr = getattr(func, '__qualname__')
@@ -60,14 +63,13 @@
else:
func_repr = repr(func)
- if args is not None:
- func_repr += _format_args(args)
+ func_repr += _format_args_and_kwargs(args, kwargs)
if suffix:
func_repr += suffix
return func_repr
def _format_callback_source(func, args):
- func_repr = _format_callback(func, args)
+ func_repr = _format_callback(func, args, None)
source = _get_function_source(func)
if source:
func_repr += ' at %s:%s' % source
@@ -81,7 +83,6 @@
'_source_traceback', '_repr', '__weakref__')
def __init__(self, callback, args, loop):
- assert not isinstance(callback, Handle), 'A Handle is not a callback'
self._loop = loop
self._callback = callback
self._args = args
@@ -248,6 +249,10 @@
"""
raise NotImplementedError
+ def shutdown_asyncgens(self):
+ """Shutdown all active asynchronous generators."""
+ raise NotImplementedError
+
# Methods scheduling callbacks. All these return Handles.
def _timer_handle_cancelled(self, handle):
@@ -603,6 +608,30 @@
_lock = threading.Lock()
+# A TLS for the running event loop, used by _get_running_loop.
+class _RunningLoop(threading.local):
+ _loop = None
+_running_loop = _RunningLoop()
+
+
+def _get_running_loop():
+ """Return the running event loop or None.
+
+ This is a low-level function intended to be used by event loops.
+ This function is thread-specific.
+ """
+ return _running_loop._loop
+
+
+def _set_running_loop(loop):
+ """Set the running event loop.
+
+ This is a low-level function intended to be used by event loops.
+ This function is thread-specific.
+ """
+ _running_loop._loop = loop
+
+
def _init_event_loop_policy():
global _event_loop_policy
with _lock:
@@ -628,7 +657,17 @@
def get_event_loop():
- """Equivalent to calling get_event_loop_policy().get_event_loop()."""
+ """Return an asyncio event loop.
+
+ When called from a coroutine or a callback (e.g. scheduled with call_soon
+ or similar API), this function will always return the running event loop.
+
+ If there is no running event loop set, the function will return
+ the result of `get_event_loop_policy().get_event_loop()` call.
+ """
+ current_loop = _get_running_loop()
+ if current_loop is not None:
+ return current_loop
return get_event_loop_policy().get_event_loop()
diff --git a/lib-python/3/asyncio/futures.py b/lib-python/3/asyncio/futures.py
--- a/lib-python/3/asyncio/futures.py
+++ b/lib-python/3/asyncio/futures.py
@@ -2,7 +2,7 @@
__all__ = ['CancelledError', 'TimeoutError',
'InvalidStateError',
- 'Future', 'wrap_future',
+ 'Future', 'wrap_future', 'isfuture',
]
import concurrent.futures._base
@@ -110,6 +110,17 @@
self.loop.call_exception_handler({'message': msg})
+def isfuture(obj):
+ """Check for a Future.
+
+ This returns True when obj is a Future instance or is advertising
+ itself as duck-type compatible by setting _asyncio_future_blocking.
+ See comment in Future for more details.
+ """
+ return (hasattr(obj.__class__, '_asyncio_future_blocking') and
+ obj._asyncio_future_blocking is not None)
+
+
class Future:
"""This class is *almost* compatible with concurrent.futures.Future.
@@ -134,7 +145,15 @@
_loop = None
_source_traceback = None
- _blocking = False # proper use of future (yield vs yield from)
+ # This field is used for a dual purpose:
+ # - Its presence is a marker to declare that a class implements
+ # the Future protocol (i.e. is intended to be duck-type compatible).
+ # The value must also be not-None, to enable a subclass to declare
+ # that it is not compatible by setting this to None.
+ # - It is set by __iter__() below so that Task._step() can tell
+ # the difference between `yield from Future()` (correct) vs.
+ # `yield Future()` (incorrect).
+ _asyncio_future_blocking = False
_log_traceback = False # Used for Python 3.4 and later
_tb_logger = None # Used for Python 3.3 only
@@ -357,7 +376,7 @@
def __iter__(self):
if not self.done():
- self._blocking = True
+ self._asyncio_future_blocking = True
yield self # This tells Task to wait for completion.
assert self.done(), "yield from wasn't used with future"
return self.result() # May raise too.
@@ -415,15 +434,17 @@
If destination is cancelled, source gets cancelled too.
Compatible with both asyncio.Future and concurrent.futures.Future.
"""
- if not isinstance(source, (Future, concurrent.futures.Future)):
+ if not isfuture(source) and not isinstance(source,
+ concurrent.futures.Future):
raise TypeError('A future is required for source argument')
- if not isinstance(destination, (Future, concurrent.futures.Future)):
+ if not isfuture(destination) and not isinstance(destination,
+ concurrent.futures.Future):
raise TypeError('A future is required for destination argument')
- source_loop = source._loop if isinstance(source, Future) else None
- dest_loop = destination._loop if isinstance(destination, Future) else None
+ source_loop = source._loop if isfuture(source) else None
+ dest_loop = destination._loop if isfuture(destination) else None
def _set_state(future, other):
- if isinstance(future, Future):
+ if isfuture(future):
_copy_future_state(other, future)
else:
_set_concurrent_future_state(future, other)
@@ -447,7 +468,7 @@
def wrap_future(future, *, loop=None):
"""Wrap concurrent.futures.Future object."""
- if isinstance(future, Future):
+ if isfuture(future):
return future
assert isinstance(future, concurrent.futures.Future), \
'concurrent.futures.Future is expected, got {!r}'.format(future)
diff --git a/lib-python/3/asyncio/locks.py b/lib-python/3/asyncio/locks.py
--- a/lib-python/3/asyncio/locks.py
+++ b/lib-python/3/asyncio/locks.py
@@ -166,7 +166,7 @@
This method blocks until the lock is unlocked, then sets it to
locked and returns True.
"""
- if not self._waiters and not self._locked:
+ if not self._locked and all(w.cancelled() for w in self._waiters):
self._locked = True
return True
diff --git a/lib-python/3/asyncio/proactor_events.py b/lib-python/3/asyncio/proactor_events.py
--- a/lib-python/3/asyncio/proactor_events.py
+++ b/lib-python/3/asyncio/proactor_events.py
@@ -66,6 +66,12 @@
def _set_extra(self, sock):
self._extra['pipe'] = sock
+ def set_protocol(self, protocol):
+ self._protocol = protocol
+
+ def get_protocol(self):
+ return self._protocol
+
def is_closing(self):
return self._closing
@@ -488,7 +494,7 @@
self._csock.send(b'\0')
def _start_serving(self, protocol_factory, sock,
- sslcontext=None, server=None):
+ sslcontext=None, server=None, backlog=100):
def loop(f=None):
try:
diff --git a/lib-python/3/asyncio/queues.py b/lib-python/3/asyncio/queues.py
--- a/lib-python/3/asyncio/queues.py
+++ b/lib-python/3/asyncio/queues.py
@@ -7,7 +7,6 @@
from . import compat
from . import events
-from . import futures
from . import locks
from .coroutines import coroutine
diff --git a/lib-python/3/asyncio/selector_events.py b/lib-python/3/asyncio/selector_events.py
--- a/lib-python/3/asyncio/selector_events.py
+++ b/lib-python/3/asyncio/selector_events.py
@@ -11,6 +11,7 @@
import functools
import socket
import warnings
+import weakref
try:
import ssl
except ImportError: # pragma: no cover
@@ -39,6 +40,17 @@
return bool(key.events & event)
+if hasattr(socket, 'TCP_NODELAY'):
+ def _set_nodelay(sock):
+ if (sock.family in {socket.AF_INET, socket.AF_INET6} and
+ sock.type == socket.SOCK_STREAM and
+ sock.proto == socket.IPPROTO_TCP):
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+else:
+ def _set_nodelay(sock):
+ pass
+
+
class BaseSelectorEventLoop(base_events.BaseEventLoop):
"""Selector event loop.
@@ -53,6 +65,7 @@
logger.debug('Using selector: %s', selector.__class__.__name__)
self._selector = selector
self._make_self_pipe()
+ self._transports = weakref.WeakValueDictionary()
def _make_socket_transport(self, sock, protocol, waiter=None, *,
extra=None, server=None):
@@ -104,7 +117,7 @@
raise NotImplementedError
def _close_self_pipe(self):
- self.remove_reader(self._ssock.fileno())
+ self._remove_reader(self._ssock.fileno())
self._ssock.close()
self._ssock = None
self._csock.close()
@@ -117,7 +130,7 @@
self._ssock.setblocking(False)
self._csock.setblocking(False)
self._internal_fds += 1
- self.add_reader(self._ssock.fileno(), self._read_from_self)
+ self._add_reader(self._ssock.fileno(), self._read_from_self)
def _process_self_data(self, data):
pass
@@ -151,43 +164,50 @@
exc_info=True)
def _start_serving(self, protocol_factory, sock,
- sslcontext=None, server=None):
- self.add_reader(sock.fileno(), self._accept_connection,
- protocol_factory, sock, sslcontext, server)
+ sslcontext=None, server=None, backlog=100):
+ self._add_reader(sock.fileno(), self._accept_connection,
+ protocol_factory, sock, sslcontext, server, backlog)
def _accept_connection(self, protocol_factory, sock,
- sslcontext=None, server=None):
- try:
- conn, addr = sock.accept()
- if self._debug:
- logger.debug("%r got a new connection from %r: %r",
- server, addr, conn)
- conn.setblocking(False)
- except (BlockingIOError, InterruptedError, ConnectionAbortedError):
- pass # False alarm.
- except OSError as exc:
- # There's nowhere to send the error, so just log it.
- if exc.errno in (errno.EMFILE, errno.ENFILE,
- errno.ENOBUFS, errno.ENOMEM):
- # Some platforms (e.g. Linux keep reporting the FD as
- # ready, so we remove the read handler temporarily.
- # We'll try again in a while.
- self.call_exception_handler({
- 'message': 'socket.accept() out of system resource',
- 'exception': exc,
- 'socket': sock,
- })
- self.remove_reader(sock.fileno())
- self.call_later(constants.ACCEPT_RETRY_DELAY,
- self._start_serving,
- protocol_factory, sock, sslcontext, server)
+ sslcontext=None, server=None, backlog=100):
+ # This method is only called once for each event loop tick where the
+ # listening socket has triggered an EVENT_READ. There may be multiple
+ # connections waiting for an .accept() so it is called in a loop.
+ # See https://bugs.python.org/issue27906 for more details.
+ for _ in range(backlog):
+ try:
+ conn, addr = sock.accept()
+ if self._debug:
+ logger.debug("%r got a new connection from %r: %r",
+ server, addr, conn)
+ conn.setblocking(False)
+ except (BlockingIOError, InterruptedError, ConnectionAbortedError):
+ # Early exit because the socket accept buffer is empty.
+ return None
+ except OSError as exc:
+ # There's nowhere to send the error, so just log it.
+ if exc.errno in (errno.EMFILE, errno.ENFILE,
+ errno.ENOBUFS, errno.ENOMEM):
+ # Some platforms (e.g. Linux keep reporting the FD as
+ # ready, so we remove the read handler temporarily.
+ # We'll try again in a while.
+ self.call_exception_handler({
+ 'message': 'socket.accept() out of system resource',
+ 'exception': exc,
+ 'socket': sock,
+ })
+ self._remove_reader(sock.fileno())
+ self.call_later(constants.ACCEPT_RETRY_DELAY,
+ self._start_serving,
+ protocol_factory, sock, sslcontext, server,
+ backlog)
+ else:
+ raise # The event loop will catch, log and ignore it.
else:
- raise # The event loop will catch, log and ignore it.
- else:
- extra = {'peername': addr}
- accept = self._accept_connection2(protocol_factory, conn, extra,
- sslcontext, server)
- self.create_task(accept)
+ extra = {'peername': addr}
+ accept = self._accept_connection2(protocol_factory, conn, extra,
+ sslcontext, server)
+ self.create_task(accept)
@coroutine
def _accept_connection2(self, protocol_factory, conn, extra,
@@ -226,8 +246,18 @@
context['transport'] = transport
self.call_exception_handler(context)
- def add_reader(self, fd, callback, *args):
- """Add a reader callback."""
+ def _ensure_fd_no_transport(self, fd):
+ try:
+ transport = self._transports[fd]
+ except KeyError:
+ pass
+ else:
+ if not transport.is_closing():
+ raise RuntimeError(
+ 'File descriptor {!r} is used by transport {!r}'.format(
+ fd, transport))
+
+ def _add_reader(self, fd, callback, *args):
self._check_closed()
handle = events.Handle(callback, args, self)
try:
@@ -242,8 +272,7 @@
if reader is not None:
reader.cancel()
- def remove_reader(self, fd):
- """Remove a reader callback."""
+ def _remove_reader(self, fd):
if self.is_closed():
return False
try:
@@ -264,8 +293,7 @@
else:
return False
- def add_writer(self, fd, callback, *args):
- """Add a writer callback.."""
+ def _add_writer(self, fd, callback, *args):
self._check_closed()
handle = events.Handle(callback, args, self)
try:
@@ -280,7 +308,7 @@
if writer is not None:
writer.cancel()
- def remove_writer(self, fd):
+ def _remove_writer(self, fd):
"""Remove a writer callback."""
if self.is_closed():
return False
@@ -303,6 +331,26 @@
else:
return False
+ def add_reader(self, fd, callback, *args):
+ """Add a reader callback."""
+ self._ensure_fd_no_transport(fd)
+ return self._add_reader(fd, callback, *args)
+
+ def remove_reader(self, fd):
+ """Remove a reader callback."""
+ self._ensure_fd_no_transport(fd)
+ return self._remove_reader(fd)
+
+ def add_writer(self, fd, callback, *args):
+ """Add a writer callback.."""
+ self._ensure_fd_no_transport(fd)
+ return self._add_writer(fd, callback, *args)
+
+ def remove_writer(self, fd):
+ """Remove a writer callback."""
+ self._ensure_fd_no_transport(fd)
+ return self._remove_writer(fd)
+
def sock_recv(self, sock, n):
"""Receive data from the socket.
@@ -382,6 +430,7 @@
data = data[n:]
self.add_writer(fd, self._sock_sendall, fut, True, sock, data)
+ @coroutine
def sock_connect(self, sock, address):
"""Connect to a remote socket at address.
@@ -390,23 +439,16 @@
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
+ if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX:
+ resolved = base_events._ensure_resolved(
+ address, family=sock.family, proto=sock.proto, loop=self)
+ if not resolved.done():
+ yield from resolved
+ _, _, _, _, address = resolved.result()[0]
+
fut = self.create_future()
- if hasattr(socket, 'AF_UNIX') and sock.family == socket.AF_UNIX:
- self._sock_connect(fut, sock, address)
- else:
- resolved = base_events._ensure_resolved(address, loop=self)
- resolved.add_done_callback(
- lambda resolved: self._on_resolved(fut, sock, resolved))
-
- return fut
-
- def _on_resolved(self, fut, sock, resolved):
- try:
- _, _, _, _, address = resolved.result()[0]
- except Exception as exc:
- fut.set_exception(exc)
- else:
- self._sock_connect(fut, sock, address)
+ self._sock_connect(fut, sock, address)
+ return (yield from fut)
def _sock_connect(self, fut, sock, address):
fd = sock.fileno()
@@ -417,8 +459,8 @@
# connection runs in background. We have to wait until the socket
# becomes writable to be notified when the connection succeed or
# fails.
- fut.add_done_callback(functools.partial(self._sock_connect_done,
- fd))
+ fut.add_done_callback(
+ functools.partial(self._sock_connect_done, fd))
self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
except Exception as exc:
fut.set_exception(exc)
@@ -482,17 +524,17 @@
fileobj, (reader, writer) = key.fileobj, key.data
if mask & selectors.EVENT_READ and reader is not None:
if reader._cancelled:
- self.remove_reader(fileobj)
+ self._remove_reader(fileobj)
else:
self._add_callback(reader)
if mask & selectors.EVENT_WRITE and writer is not None:
if writer._cancelled:
- self.remove_writer(fileobj)
+ self._remove_writer(fileobj)
else:
self._add_callback(writer)
def _stop_serving(self, sock):
- self.remove_reader(sock.fileno())
+ self._remove_reader(sock.fileno())
sock.close()
@@ -527,6 +569,7 @@
self._closing = False # Set when close() called.
if self._server is not None:
self._server._attach()
+ loop._transports[self._sock_fd] = self
def __repr__(self):
info = [self.__class__.__name__]
@@ -559,6 +602,12 @@
def abort(self):
self._force_close(None)
+ def set_protocol(self, protocol):
+ self._protocol = protocol
+
+ def get_protocol(self):
+ return self._protocol
+
def is_closing(self):
return self._closing
@@ -566,10 +615,10 @@
if self._closing:
return
self._closing = True
- self._loop.remove_reader(self._sock_fd)
+ self._loop._remove_reader(self._sock_fd)
if not self._buffer:
self._conn_lost += 1
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
self._loop.call_soon(self._call_connection_lost, None)
# On Python 3.3 and older, objects with a destructor part of a reference
@@ -600,10 +649,10 @@
return
if self._buffer:
self._buffer.clear()
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
if not self._closing:
self._closing = True
- self._loop.remove_reader(self._sock_fd)
+ self._loop._remove_reader(self._sock_fd)
self._conn_lost += 1
self._loop.call_soon(self._call_connection_lost, exc)
@@ -633,9 +682,14 @@
self._eof = False
self._paused = False
+ # Disable the Nagle algorithm -- small writes will be
+ # sent without waiting for the TCP ACK. This generally
+ # decreases the latency (in some cases significantly.)
+ _set_nodelay(self._sock)
+
self._loop.call_soon(self._protocol.connection_made, self)
# only start reading when connection_made() has been called
- self._loop.call_soon(self._loop.add_reader,
+ self._loop.call_soon(self._loop._add_reader,
self._sock_fd, self._read_ready)
if waiter is not None:
# only wake up the waiter when connection_made() has been called
@@ -648,7 +702,7 @@
if self._paused:
raise RuntimeError('Already paused')
self._paused = True
- self._loop.remove_reader(self._sock_fd)
+ self._loop._remove_reader(self._sock_fd)
if self._loop.get_debug():
logger.debug("%r pauses reading", self)
@@ -658,7 +712,7 @@
self._paused = False
if self._closing:
return
- self._loop.add_reader(self._sock_fd, self._read_ready)
+ self._loop._add_reader(self._sock_fd, self._read_ready)
if self._loop.get_debug():
logger.debug("%r resumes reading", self)
@@ -682,7 +736,7 @@
# We're keeping the connection open so the
# protocol can write more, but we still can't
# receive more, so remove the reader callback.
- self._loop.remove_reader(self._sock_fd)
+ self._loop._remove_reader(self._sock_fd)
else:
self.close()
@@ -715,7 +769,7 @@
if not data:
return
# Not all was written; register write handler.
- self._loop.add_writer(self._sock_fd, self._write_ready)
+ self._loop._add_writer(self._sock_fd, self._write_ready)
# Add it to the buffer.
self._buffer.extend(data)
@@ -731,7 +785,7 @@
except (BlockingIOError, InterruptedError):
pass
except Exception as exc:
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
self._buffer.clear()
self._fatal_error(exc, 'Fatal write error on socket transport')
else:
@@ -739,7 +793,7 @@
del self._buffer[:n]
self._maybe_resume_protocol() # May append to buffer.
if not self._buffer:
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
if self._closing:
self._call_connection_lost(None)
elif self._eof:
@@ -810,19 +864,19 @@
try:
self._sock.do_handshake()
except ssl.SSLWantReadError:
- self._loop.add_reader(self._sock_fd,
- self._on_handshake, start_time)
+ self._loop._add_reader(self._sock_fd,
+ self._on_handshake, start_time)
return
except ssl.SSLWantWriteError:
- self._loop.add_writer(self._sock_fd,
- self._on_handshake, start_time)
+ self._loop._add_writer(self._sock_fd,
+ self._on_handshake, start_time)
return
except BaseException as exc:
if self._loop.get_debug():
logger.warning("%r: SSL handshake failed",
self, exc_info=True)
- self._loop.remove_reader(self._sock_fd)
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_reader(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
self._sock.close()
self._wakeup_waiter(exc)
if isinstance(exc, Exception):
@@ -830,8 +884,8 @@
else:
raise
- self._loop.remove_reader(self._sock_fd)
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_reader(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
peercert = self._sock.getpeercert()
if not hasattr(self._sslcontext, 'check_hostname'):
@@ -859,7 +913,7 @@
self._read_wants_write = False
self._write_wants_read = False
- self._loop.add_reader(self._sock_fd, self._read_ready)
+ self._loop._add_reader(self._sock_fd, self._read_ready)
self._protocol_connected = True
self._loop.call_soon(self._protocol.connection_made, self)
# only wake up the waiter when connection_made() has been called
@@ -881,7 +935,7 @@
if self._paused:
raise RuntimeError('Already paused')
self._paused = True
- self._loop.remove_reader(self._sock_fd)
+ self._loop._remove_reader(self._sock_fd)
if self._loop.get_debug():
logger.debug("%r pauses reading", self)
@@ -891,7 +945,7 @@
self._paused = False
if self._closing:
return
- self._loop.add_reader(self._sock_fd, self._read_ready)
+ self._loop._add_reader(self._sock_fd, self._read_ready)
if self._loop.get_debug():
logger.debug("%r resumes reading", self)
@@ -903,7 +957,7 @@
self._write_ready()
if self._buffer:
- self._loop.add_writer(self._sock_fd, self._write_ready)
+ self._loop._add_writer(self._sock_fd, self._write_ready)
try:
data = self._sock.recv(self.max_size)
@@ -911,8 +965,8 @@
pass
except ssl.SSLWantWriteError:
self._read_wants_write = True
- self._loop.remove_reader(self._sock_fd)
- self._loop.add_writer(self._sock_fd, self._write_ready)
+ self._loop._remove_reader(self._sock_fd)
+ self._loop._add_writer(self._sock_fd, self._write_ready)
except Exception as exc:
self._fatal_error(exc, 'Fatal read error on SSL transport')
else:
@@ -937,7 +991,7 @@
self._read_ready()
if not (self._paused or self._closing):
- self._loop.add_reader(self._sock_fd, self._read_ready)
+ self._loop._add_reader(self._sock_fd, self._read_ready)
if self._buffer:
try:
@@ -946,10 +1000,10 @@
n = 0
except ssl.SSLWantReadError:
n = 0
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
self._write_wants_read = True
except Exception as exc:
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
self._buffer.clear()
self._fatal_error(exc, 'Fatal write error on SSL transport')
return
@@ -960,7 +1014,7 @@
self._maybe_resume_protocol() # May append to buffer.
if not self._buffer:
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
if self._closing:
self._call_connection_lost(None)
@@ -978,7 +1032,7 @@
return
if not self._buffer:
- self._loop.add_writer(self._sock_fd, self._write_ready)
+ self._loop._add_writer(self._sock_fd, self._write_ready)
# Add it to the buffer.
self._buffer.extend(data)
@@ -998,7 +1052,7 @@
self._address = address
self._loop.call_soon(self._protocol.connection_made, self)
# only start reading when connection_made() has been called
- self._loop.call_soon(self._loop.add_reader,
+ self._loop.call_soon(self._loop._add_reader,
self._sock_fd, self._read_ready)
if waiter is not None:
# only wake up the waiter when connection_made() has been called
@@ -1048,7 +1102,7 @@
self._sock.sendto(data, addr)
return
except (BlockingIOError, InterruptedError):
- self._loop.add_writer(self._sock_fd, self._sendto_ready)
+ self._loop._add_writer(self._sock_fd, self._sendto_ready)
except OSError as exc:
self._protocol.error_received(exc)
return
@@ -1082,6 +1136,6 @@
self._maybe_resume_protocol() # May append to buffer.
if not self._buffer:
- self._loop.remove_writer(self._sock_fd)
+ self._loop._remove_writer(self._sock_fd)
if self._closing:
self._call_connection_lost(None)
diff --git a/lib-python/3/asyncio/sslproto.py b/lib-python/3/asyncio/sslproto.py
--- a/lib-python/3/asyncio/sslproto.py
+++ b/lib-python/3/asyncio/sslproto.py
@@ -5,6 +5,7 @@
except ImportError: # pragma: no cover
ssl = None
+from . import base_events
from . import compat
from . import protocols
from . import transports
@@ -304,6 +305,12 @@
"""Get optional transport information."""
return self._ssl_protocol._get_extra_info(name, default)
+ def set_protocol(self, protocol):
+ self._app_protocol = protocol
+
+ def get_protocol(self):
+ return self._app_protocol
+
def is_closing(self):
return self._closed
@@ -403,7 +410,8 @@
"""
def __init__(self, loop, app_protocol, sslcontext, waiter,
- server_side=False, server_hostname=None):
+ server_side=False, server_hostname=None,
+ call_connection_made=True):
if ssl is None:
raise RuntimeError('stdlib ssl module not available')
@@ -436,6 +444,7 @@
self._in_shutdown = False
# transport, ex: SelectorSocketTransport
self._transport = None
+ self._call_connection_made = call_connection_made
def _wakeup_waiter(self, exc=None):
if self._waiter is None:
@@ -470,6 +479,7 @@
self._loop.call_soon(self._app_protocol.connection_lost, exc)
self._transport = None
self._app_transport = None
+ self._wakeup_waiter(exc)
def pause_writing(self):
"""Called when the low-level transport's buffer goes over
@@ -599,7 +609,8 @@
compression=sslobj.compression(),
ssl_object=sslobj,
)
- self._app_protocol.connection_made(self._app_transport)
+ if self._call_connection_made:
+ self._app_protocol.connection_made(self._app_transport)
self._wakeup_waiter()
self._session_established = True
# In case transport.write() was already called. Don't call
diff --git a/lib-python/3/asyncio/streams.py b/lib-python/3/asyncio/streams.py
--- a/lib-python/3/asyncio/streams.py
+++ b/lib-python/3/asyncio/streams.py
@@ -448,6 +448,7 @@
assert not self._eof, '_wait_for_data after EOF'
# Waiting for data while paused will make deadlock, so prevent it.
+ # This is essential for readexactly(n) for case when n > self._limit.
if self._paused:
self._paused = False
self._transport.resume_reading()
@@ -590,7 +591,7 @@
bytes. If the EOF was received and the internal buffer is empty, return
an empty bytes object.
- If n is zero, return empty bytes object immediatelly.
+ If n is zero, return empty bytes object immediately.
If n is positive, this function try to read `n` bytes, and may return
less or equal bytes than requested, but at least one byte. If EOF was
@@ -658,25 +659,22 @@
if n == 0:
return b''
- # There used to be "optimized" code here. It created its own
- # Future and waited until self._buffer had at least the n
- # bytes, then called read(n). Unfortunately, this could pause
- # the transport if the argument was larger than the pause
- # limit (which is twice self._limit). So now we just read()
- # into a local buffer.
+ while len(self._buffer) < n:
+ if self._eof:
+ incomplete = bytes(self._buffer)
+ self._buffer.clear()
+ raise IncompleteReadError(incomplete, n)
- blocks = []
- while n > 0:
- block = yield from self.read(n)
- if not block:
- partial = b''.join(blocks)
- raise IncompleteReadError(partial, len(partial) + n)
- blocks.append(block)
- n -= len(block)
+ yield from self._wait_for_data('readexactly')
- assert n == 0
-
- return b''.join(blocks)
+ if len(self._buffer) == n:
+ data = bytes(self._buffer)
+ self._buffer.clear()
+ else:
+ data = bytes(self._buffer[:n])
+ del self._buffer[:n]
+ self._maybe_resume_transport()
+ return data
if compat.PY35:
@coroutine
diff --git a/lib-python/3/asyncio/tasks.py b/lib-python/3/asyncio/tasks.py
--- a/lib-python/3/asyncio/tasks.py
+++ b/lib-python/3/asyncio/tasks.py
@@ -241,7 +241,7 @@
result = coro.throw(exc)
except StopIteration as exc:
self.set_result(exc.value)
- except futures.CancelledError as exc:
+ except futures.CancelledError:
super().cancel() # I.e., Future.cancel(self).
except Exception as exc:
self.set_exception(exc)
@@ -249,7 +249,8 @@
self.set_exception(exc)
raise
else:
- if isinstance(result, futures.Future):
+ blocking = getattr(result, '_asyncio_future_blocking', None)
+ if blocking is not None:
# Yielded Future must come from Future.__iter__().
if result._loop is not self._loop:
self._loop.call_soon(
@@ -257,13 +258,20 @@
RuntimeError(
'Task {!r} got Future {!r} attached to a '
'different loop'.format(self, result)))
- elif result._blocking:
- result._blocking = False
- result.add_done_callback(self._wakeup)
- self._fut_waiter = result
- if self._must_cancel:
- if self._fut_waiter.cancel():
- self._must_cancel = False
+ elif blocking:
+ if result is self:
+ self._loop.call_soon(
+ self._step,
+ RuntimeError(
+ 'Task cannot await on itself: {!r}'.format(
+ self)))
+ else:
+ result._asyncio_future_blocking = False
+ result.add_done_callback(self._wakeup)
+ self._fut_waiter = result
+ if self._must_cancel:
+ if self._fut_waiter.cancel():
+ self._must_cancel = False
else:
self._loop.call_soon(
self._step,
@@ -332,7 +340,7 @@
Note: This does not raise TimeoutError! Futures that aren't done
when the timeout occurs are returned in the second set.
"""
- if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
+ if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
if not fs:
raise ValueError('Set of coroutines/Futures is empty.')
@@ -461,7 +469,7 @@
Note: The futures 'f' are not necessarily members of fs.
"""
- if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
+ if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
loop = loop if loop is not None else events.get_event_loop()
todo = {ensure_future(f, loop=loop) for f in set(fs)}
@@ -518,7 +526,7 @@
h.cancel()
-def async(coro_or_future, *, loop=None):
+def async_(coro_or_future, *, loop=None):
"""Wrap a coroutine in a future.
If the argument is a Future, it is returned directly.
@@ -531,13 +539,18 @@
return ensure_future(coro_or_future, loop=loop)
+# Silence DeprecationWarning:
+globals()['async'] = async_
+async_.__name__ = 'async'
+del async_
+
def ensure_future(coro_or_future, *, loop=None):
"""Wrap a coroutine or an awaitable in a future.
If the argument is a Future, it is returned directly.
"""
- if isinstance(coro_or_future, futures.Future):
+ if futures.isfuture(coro_or_future):
if loop is not None and loop is not coro_or_future._loop:
raise ValueError('loop argument must agree with Future')
return coro_or_future
@@ -579,15 +592,21 @@
def cancel(self):
if self.done():
return False
+ ret = False
for child in self._children:
- child.cancel()
- return True
+ if child.cancel():
+ ret = True
+ return ret
def gather(*coros_or_futures, loop=None, return_exceptions=False):
"""Return a future aggregating results from the given coroutines
or futures.
+ Coroutines will be wrapped in a future and scheduled in the event
+ loop. They will not necessarily be scheduled in the same order as
+ passed in.
+
All futures must share the same event loop. If all the tasks are
done successfully, the returned future's result is the list of
results (in the order of the original sequence, not necessarily
@@ -613,7 +632,7 @@
arg_to_fut = {}
for arg in set(coros_or_futures):
- if not isinstance(arg, futures.Future):
+ if not futures.isfuture(arg):
fut = ensure_future(arg, loop=loop)
if loop is None:
loop = fut._loop
diff --git a/lib-python/3/asyncio/test_utils.py b/lib-python/3/asyncio/test_utils.py
--- a/lib-python/3/asyncio/test_utils.py
+++ b/lib-python/3/asyncio/test_utils.py
@@ -13,6 +13,8 @@
import threading
import time
import unittest
+import weakref
+
from unittest import mock
from http.server import HTTPServer
@@ -300,6 +302,8 @@
self.writers = {}
self.reset_counters()
+ self._transports = weakref.WeakValueDictionary()
+
def time(self):
return self._time
@@ -318,10 +322,10 @@
else: # pragma: no cover
raise AssertionError("Time generator is not finished")
- def add_reader(self, fd, callback, *args):
+ def _add_reader(self, fd, callback, *args):
self.readers[fd] = events.Handle(callback, args, self)
- def remove_reader(self, fd):
+ def _remove_reader(self, fd):
self.remove_reader_count[fd] += 1
if fd in self.readers:
del self.readers[fd]
@@ -337,10 +341,10 @@
assert handle._args == args, '{!r} != {!r}'.format(
handle._args, args)
- def add_writer(self, fd, callback, *args):
+ def _add_writer(self, fd, callback, *args):
self.writers[fd] = events.Handle(callback, args, self)
- def remove_writer(self, fd):
+ def _remove_writer(self, fd):
self.remove_writer_count[fd] += 1
if fd in self.writers:
del self.writers[fd]
@@ -356,6 +360,36 @@
assert handle._args == args, '{!r} != {!r}'.format(
handle._args, args)
+ def _ensure_fd_no_transport(self, fd):
+ try:
+ transport = self._transports[fd]
+ except KeyError:
+ pass
+ else:
+ raise RuntimeError(
+ 'File descriptor {!r} is used by transport {!r}'.format(
+ fd, transport))
+
+ def add_reader(self, fd, callback, *args):
+ """Add a reader callback."""
+ self._ensure_fd_no_transport(fd)
+ return self._add_reader(fd, callback, *args)
+
+ def remove_reader(self, fd):
+ """Remove a reader callback."""
+ self._ensure_fd_no_transport(fd)
+ return self._remove_reader(fd)
+
+ def add_writer(self, fd, callback, *args):
+ """Add a writer callback.."""
+ self._ensure_fd_no_transport(fd)
+ return self._add_writer(fd, callback, *args)
+
+ def remove_writer(self, fd):
+ """Remove a writer callback."""
+ self._ensure_fd_no_transport(fd)
+ return self._remove_writer(fd)
+
def reset_counters(self):
self.remove_reader_count = collections.defaultdict(int)
self.remove_writer_count = collections.defaultdict(int)
@@ -415,7 +449,13 @@
self.set_event_loop(loop)
return loop
+ def setUp(self):
+ self._get_running_loop = events._get_running_loop
+ events._get_running_loop = lambda: None
+
def tearDown(self):
+ events._get_running_loop = self._get_running_loop
+
events.set_event_loop(None)
# Detect CPython bug #23353: ensure that yield/yield-from is not used
diff --git a/lib-python/3/asyncio/transports.py b/lib-python/3/asyncio/transports.py
--- a/lib-python/3/asyncio/transports.py
+++ b/lib-python/3/asyncio/transports.py
@@ -33,6 +33,14 @@
"""
raise NotImplementedError
+ def set_protocol(self, protocol):
+ """Set a new protocol."""
+ raise NotImplementedError
+
+ def get_protocol(self):
+ """Return the current protocol."""
+ raise NotImplementedError
+
class ReadTransport(BaseTransport):
"""Interface for read-only transports."""
diff --git a/lib-python/3/asyncio/unix_events.py b/lib-python/3/asyncio/unix_events.py
--- a/lib-python/3/asyncio/unix_events.py
+++ b/lib-python/3/asyncio/unix_events.py
@@ -39,6 +39,13 @@
pass
+try:
+ _fspath = os.fspath
+except AttributeError:
+ # Python 3.5 or earlier
+ _fspath = lambda path: path
+
+
class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
"""Unix event loop.
@@ -234,6 +241,11 @@
else:
if sock is None:
raise ValueError('no path and sock were specified')
+ if (sock.family != socket.AF_UNIX or
+ not base_events._is_stream_socket(sock)):
+ raise ValueError(
+ 'A UNIX Domain Stream Socket was expected, got {!r}'
+ .format(sock))
sock.setblocking(False)
transport, protocol = yield from self._create_connection_transport(
@@ -251,8 +263,20 @@
raise ValueError(
'path and sock can not be specified at the same time')
+ path = _fspath(path)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ # Check for abstract socket. `str` and `bytes` paths are supported.
+ if path[0] not in (0, '\x00'):
+ try:
+ if stat.S_ISSOCK(os.stat(path).st_mode):
+ os.remove(path)
+ except FileNotFoundError:
+ pass
+ except OSError as err:
+ # Directory may have permissions only to create socket.
+ logger.error('Unable to check or remove stale UNIX socket %r: %r', path, err)
+
try:
sock.bind(path)
except OSError as exc:
@@ -272,9 +296,11 @@
raise ValueError(
'path was not specified, and no sock specified')
- if sock.family != socket.AF_UNIX:
+ if (sock.family != socket.AF_UNIX or
+ not base_events._is_stream_socket(sock)):
raise ValueError(
- 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
+ 'A UNIX Domain Stream Socket was expected, got {!r}'
+ .format(sock))
server = base_events.Server(self, [sock])
sock.listen(backlog)
@@ -305,17 +331,23 @@
self._loop = loop
self._pipe = pipe
self._fileno = pipe.fileno()
+ self._protocol = protocol
+ self._closing = False
+
mode = os.fstat(self._fileno).st_mode
if not (stat.S_ISFIFO(mode) or
stat.S_ISSOCK(mode) or
stat.S_ISCHR(mode)):
+ self._pipe = None
+ self._fileno = None
+ self._protocol = None
raise ValueError("Pipe transport is for pipes/sockets only.")
+
_set_nonblocking(self._fileno)
- self._protocol = protocol
- self._closing = False
+
self._loop.call_soon(self._protocol.connection_made, self)
# only start reading when connection_made() has been called
- self._loop.call_soon(self._loop.add_reader,
+ self._loop.call_soon(self._loop._add_reader,
self._fileno, self._read_ready)
if waiter is not None:
# only wake up the waiter when connection_made() has been called
More information about the pypy-commit
mailing list