[pypy-commit] pypy vendor/stdlib-3.5: Import stdlib from CPython 3.5.3 (changeset 'v3.5.3')

arigo pypy.commits at gmail.com
Sun Feb 5 13:44:18 EST 2017


Author: Armin Rigo <arigo at tunes.org>
Branch: vendor/stdlib-3.5
Changeset: r89950:a9167602e69c
Date: 2017-02-05 19:12 +0100
http://bitbucket.org/pypy/pypy/changeset/a9167602e69c/

Log:	Import stdlib from CPython 3.5.3 (changeset 'v3.5.3')

diff too long, truncating to 2000 out of 24911 lines

diff --git a/lib-python/3/_collections_abc.py b/lib-python/3/_collections_abc.py
--- a/lib-python/3/_collections_abc.py
+++ b/lib-python/3/_collections_abc.py
@@ -29,8 +29,8 @@
 # so that they will pass tests like:
 #       it = iter(somebytearray)
 #       assert isinstance(it, Iterable)
-# Note:  in other implementations, these types many not be distinct
-# and they make have their own implementation specific types that
+# Note:  in other implementations, these types might not be distinct
+# and they may have their own implementation specific types that
 # are not included on this list.
 bytes_iterator = type(iter(b''))
 bytearray_iterator = type(iter(bytearray()))
@@ -41,6 +41,7 @@
 list_iterator = type(iter([]))
 list_reverseiterator = type(iter(reversed([])))
 range_iterator = type(iter(range(0)))
+longrange_iterator = type(iter(range(1 << 1000)))
 set_iterator = type(iter(set()))
 str_iterator = type(iter(""))
 tuple_iterator = type(iter(()))
@@ -234,6 +235,7 @@
 Iterator.register(list_iterator)
 Iterator.register(list_reverseiterator)
 Iterator.register(range_iterator)
+Iterator.register(longrange_iterator)
 Iterator.register(set_iterator)
 Iterator.register(str_iterator)
 Iterator.register(tuple_iterator)
diff --git a/lib-python/3/_pydecimal.py b/lib-python/3/_pydecimal.py
--- a/lib-python/3/_pydecimal.py
+++ b/lib-python/3/_pydecimal.py
@@ -1068,12 +1068,11 @@
         return sign + intpart + fracpart + exp
 
     def to_eng_string(self, context=None):
-        """Convert to engineering-type string.
-
-        Engineering notation has an exponent which is a multiple of 3, so there
-        are up to 3 digits left of the decimal place.
-
-        Same rules for when in exponential and when as a value as in __str__.
+        """Convert to a string, using engineering notation if an exponent is needed.
+
+        Engineering notation has an exponent which is a multiple of 3.  This
+        can leave up to 3 digits to the left of the decimal place and may
+        require the addition of either one or two trailing zeros.
         """
         return self.__str__(eng=True, context=context)
 
@@ -4107,7 +4106,7 @@
         >>> context.create_decimal_from_float(3.1415926535897932)
         Traceback (most recent call last):
             ...
-        decimal.Inexact
+        decimal.Inexact: None
 
         """
         d = Decimal.from_float(f)       # An exact conversion
@@ -5502,9 +5501,29 @@
             return r
 
     def to_eng_string(self, a):
-        """Converts a number to a string, using scientific notation.
+        """Convert to a string, using engineering notation if an exponent is needed.
+
+        Engineering notation has an exponent which is a multiple of 3.  This
+        can leave up to 3 digits to the left of the decimal place and may
+        require the addition of either one or two trailing zeros.
 
         The operation is not affected by the context.
+
+        >>> ExtendedContext.to_eng_string(Decimal('123E+1'))
+        '1.23E+3'
+        >>> ExtendedContext.to_eng_string(Decimal('123E+3'))
+        '123E+3'
+        >>> ExtendedContext.to_eng_string(Decimal('123E-10'))
+        '12.3E-9'
+        >>> ExtendedContext.to_eng_string(Decimal('-123E-12'))
+        '-123E-12'
+        >>> ExtendedContext.to_eng_string(Decimal('7E-7'))
+        '700E-9'
+        >>> ExtendedContext.to_eng_string(Decimal('7E+1'))
+        '70'
+        >>> ExtendedContext.to_eng_string(Decimal('0E+1'))
+        '0.00E+3'
+
         """
         a = _convert_other(a, raiseit=True)
         return a.to_eng_string(context=self)
diff --git a/lib-python/3/_pyio.py b/lib-python/3/_pyio.py
--- a/lib-python/3/_pyio.py
+++ b/lib-python/3/_pyio.py
@@ -276,7 +276,7 @@
 try:
     UnsupportedOperation = io.UnsupportedOperation
 except AttributeError:
-    class UnsupportedOperation(ValueError, OSError):
+    class UnsupportedOperation(OSError, ValueError):
         pass
 
 
diff --git a/lib-python/3/antigravity.py b/lib-python/3/antigravity.py
--- a/lib-python/3/antigravity.py
+++ b/lib-python/3/antigravity.py
@@ -2,7 +2,7 @@
 import webbrowser
 import hashlib
 
-webbrowser.open("http://xkcd.com/353/")
+webbrowser.open("https://xkcd.com/353/")
 
 def geohash(latitude, longitude, datedow):
     '''Compute geohash() using the Munroe algorithm.
diff --git a/lib-python/3/asyncio/base_events.py b/lib-python/3/asyncio/base_events.py
--- a/lib-python/3/asyncio/base_events.py
+++ b/lib-python/3/asyncio/base_events.py
@@ -13,7 +13,6 @@
 to modify the meaning of the API call itself.
 """
 
-
 import collections
 import concurrent.futures
 import heapq
@@ -28,6 +27,7 @@
 import traceback
 import sys
 import warnings
+import weakref
 
 from . import compat
 from . import coroutines
@@ -41,9 +41,6 @@
 __all__ = ['BaseEventLoop']
 
 
-# Argument for default thread pool executor creation.
-_MAX_WORKERS = 5
-
 # Minimum number of _scheduled timer handles before cleanup of
 # cancelled handles is performed.
 _MIN_SCHEDULED_TIMER_HANDLES = 100
@@ -76,12 +73,29 @@
         return repr(fd)
 
 
-# Linux's sock.type is a bitmask that can include extra info about socket.
-_SOCKET_TYPE_MASK = 0
-if hasattr(socket, 'SOCK_NONBLOCK'):
-    _SOCKET_TYPE_MASK |= socket.SOCK_NONBLOCK
-if hasattr(socket, 'SOCK_CLOEXEC'):
-    _SOCKET_TYPE_MASK |= socket.SOCK_CLOEXEC
+def _set_reuseport(sock):
+    if not hasattr(socket, 'SO_REUSEPORT'):
+        raise ValueError('reuse_port not supported by socket module')
+    else:
+        try:
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+        except OSError:
+            raise ValueError('reuse_port not supported by socket module, '
+                             'SO_REUSEPORT defined but not implemented.')
+
+
+def _is_stream_socket(sock):
+    # Linux's socket.type is a bitmask that can include extra info
+    # about socket, therefore we can't do simple
+    # `sock_type == socket.SOCK_STREAM`.
+    return (sock.type & socket.SOCK_STREAM) == socket.SOCK_STREAM
+
+
+def _is_dgram_socket(sock):
+    # Linux's socket.type is a bitmask that can include extra info
+    # about socket, therefore we can't do simple
+    # `sock_type == socket.SOCK_DGRAM`.
+    return (sock.type & socket.SOCK_DGRAM) == socket.SOCK_DGRAM
 
 
 def _ipaddr_info(host, port, family, type, proto):
@@ -94,8 +108,12 @@
             host is None:
         return None
 
-    type &= ~_SOCKET_TYPE_MASK
     if type == socket.SOCK_STREAM:
+        # Linux only:
+        #    getaddrinfo() can raise when socket.type is a bit mask.
+        #    So if socket.type is a bit mask of SOCK_STREAM, and say
+        #    SOCK_NONBLOCK, we simply return None, which will trigger
+        #    a call to getaddrinfo() letting it process this request.
         proto = socket.IPPROTO_TCP
     elif type == socket.SOCK_DGRAM:
         proto = socket.IPPROTO_UDP
@@ -104,27 +122,21 @@
 
     if port is None:
         port = 0
-    elif isinstance(port, bytes):
-        if port == b'':
-            port = 0
-        else:
-            try:
-                port = int(port)
-            except ValueError:
-                # Might be a service name like b"http".
-                port = socket.getservbyname(port.decode('ascii'))
-    elif isinstance(port, str):
-        if port == '':
-            port = 0
-        else:
-            try:
-                port = int(port)
-            except ValueError:
-                # Might be a service name like "http".
-                port = socket.getservbyname(port)
+    elif isinstance(port, bytes) and port == b'':
+        port = 0
+    elif isinstance(port, str) and port == '':
+        port = 0
+    else:
+        # If port's a service name like "http", don't skip getaddrinfo.
+        try:
+            port = int(port)
+        except (TypeError, ValueError):
+            return None
 
     if family == socket.AF_UNSPEC:
-        afs = [socket.AF_INET, socket.AF_INET6]
+        afs = [socket.AF_INET]
+        if hasattr(socket, 'AF_INET6'):
+            afs.append(socket.AF_INET6)
     else:
         afs = [family]
 
@@ -242,6 +254,17 @@
         self._task_factory = None
         self._coroutine_wrapper_set = False
 
+        if hasattr(sys, 'get_asyncgen_hooks'):
+            # Python >= 3.6
+            # A weak set of all asynchronous generators that are
+            # being iterated by the loop.
+            self._asyncgens = weakref.WeakSet()
+        else:
+            self._asyncgens = None
+
+        # Set to True when `loop.shutdown_asyncgens` is called.
+        self._asyncgens_shutdown_called = False
+
     def __repr__(self):
         return ('<%s running=%s closed=%s debug=%s>'
                 % (self.__class__.__name__, self.is_running(),
@@ -333,14 +356,67 @@
         if self._closed:
             raise RuntimeError('Event loop is closed')
 
+    def _asyncgen_finalizer_hook(self, agen):
+        self._asyncgens.discard(agen)
+        if not self.is_closed():
+            self.create_task(agen.aclose())
+            # Wake up the loop if the finalizer was called from
+            # a different thread.
+            self._write_to_self()
+
+    def _asyncgen_firstiter_hook(self, agen):
+        if self._asyncgens_shutdown_called:
+            warnings.warn(
+                "asynchronous generator {!r} was scheduled after "
+                "loop.shutdown_asyncgens() call".format(agen),
+                ResourceWarning, source=self)
+
+        self._asyncgens.add(agen)
+
+    @coroutine
+    def shutdown_asyncgens(self):
+        """Shutdown all active asynchronous generators."""
+        self._asyncgens_shutdown_called = True
+
+        if self._asyncgens is None or not len(self._asyncgens):
+            # If Python version is <3.6 or we don't have any asynchronous
+            # generators alive.
+            return
+
+        closing_agens = list(self._asyncgens)
+        self._asyncgens.clear()
+
+        shutdown_coro = tasks.gather(
+            *[ag.aclose() for ag in closing_agens],
+            return_exceptions=True,
+            loop=self)
+
+        results = yield from shutdown_coro
+        for result, agen in zip(results, closing_agens):
+            if isinstance(result, Exception):
+                self.call_exception_handler({
+                    'message': 'an error occurred during closing of '
+                               'asynchronous generator {!r}'.format(agen),
+                    'exception': result,
+                    'asyncgen': agen
+                })
+
     def run_forever(self):
         """Run until stop() is called."""
         self._check_closed()
         if self.is_running():
-            raise RuntimeError('Event loop is running.')
+            raise RuntimeError('This event loop is already running')
+        if events._get_running_loop() is not None:
+            raise RuntimeError(
+                'Cannot run the event loop while another loop is running')
         self._set_coroutine_wrapper(self._debug)
         self._thread_id = threading.get_ident()
+        if self._asyncgens is not None:
+            old_agen_hooks = sys.get_asyncgen_hooks()
+            sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
+                                   finalizer=self._asyncgen_finalizer_hook)
         try:
+            events._set_running_loop(self)
             while True:
                 self._run_once()
                 if self._stopping:
@@ -348,7 +424,10 @@
         finally:
             self._stopping = False
             self._thread_id = None
+            events._set_running_loop(None)
             self._set_coroutine_wrapper(False)
+            if self._asyncgens is not None:
+                sys.set_asyncgen_hooks(*old_agen_hooks)
 
     def run_until_complete(self, future):
         """Run until the Future is done.
@@ -363,7 +442,7 @@
         """
         self._check_closed()
 
-        new_task = not isinstance(future, futures.Future)
+        new_task = not futures.isfuture(future)
         future = tasks.ensure_future(future, loop=self)
         if new_task:
             # An exception is raised if the future didn't complete, so there
@@ -469,12 +548,10 @@
 
         Absolute time corresponds to the event loop's time() method.
         """
-        if (coroutines.iscoroutine(callback)
-        or coroutines.iscoroutinefunction(callback)):
-            raise TypeError("coroutines cannot be used with call_at()")
         self._check_closed()
         if self._debug:
             self._check_thread()
+            self._check_callback(callback, 'call_at')
         timer = events.TimerHandle(when, callback, args, self)
         if timer._source_traceback:
             del timer._source_traceback[-1]
@@ -492,18 +569,27 @@
         Any positional arguments after the callback will be passed to
         the callback when it is called.
         """
+        self._check_closed()
         if self._debug:
             self._check_thread()
+            self._check_callback(callback, 'call_soon')
         handle = self._call_soon(callback, args)
         if handle._source_traceback:
             del handle._source_traceback[-1]
         return handle
 
+    def _check_callback(self, callback, method):
+        if (coroutines.iscoroutine(callback) or
+                coroutines.iscoroutinefunction(callback)):
+            raise TypeError(
+                "coroutines cannot be used with {}()".format(method))
+        if not callable(callback):
+            raise TypeError(
+                'a callable object was expected by {}(), got {!r}'.format(
+                    method, callback))
+
+
     def _call_soon(self, callback, args):
-        if (coroutines.iscoroutine(callback)
-        or coroutines.iscoroutinefunction(callback)):
-            raise TypeError("coroutines cannot be used with call_soon()")
-        self._check_closed()
         handle = events.Handle(callback, args, self)
         if handle._source_traceback:
             del handle._source_traceback[-1]
@@ -529,6 +615,9 @@
 
     def call_soon_threadsafe(self, callback, *args):
         """Like call_soon(), but thread-safe."""
+        self._check_closed()
+        if self._debug:
+            self._check_callback(callback, 'call_soon_threadsafe')
         handle = self._call_soon(callback, args)
         if handle._source_traceback:
             del handle._source_traceback[-1]
@@ -536,22 +625,13 @@
         return handle
 
     def run_in_executor(self, executor, func, *args):
-        if (coroutines.iscoroutine(func)
-        or coroutines.iscoroutinefunction(func)):
-            raise TypeError("coroutines cannot be used with run_in_executor()")
         self._check_closed()
-        if isinstance(func, events.Handle):
-            assert not args
-            assert not isinstance(func, events.TimerHandle)
-            if func._cancelled:
-                f = self.create_future()
-                f.set_result(None)
-                return f
-            func, args = func._callback, func._args
+        if self._debug:
+            self._check_callback(func, 'run_in_executor')
         if executor is None:
             executor = self._default_executor
             if executor is None:
-                executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
+                executor = concurrent.futures.ThreadPoolExecutor()
                 self._default_executor = executor
         return futures.wrap_future(executor.submit(func, *args), loop=self)
 
@@ -703,11 +783,19 @@
                     raise OSError('Multiple exceptions: {}'.format(
                         ', '.join(str(exc) for exc in exceptions)))
 
-        elif sock is None:
-            raise ValueError(
-                'host and port was not specified and no sock specified')
-
-        sock.setblocking(False)
+        else:
+            if sock is None:
+                raise ValueError(
+                    'host and port was not specified and no sock specified')
+            if not _is_stream_socket(sock):
+                # We allow AF_INET, AF_INET6, AF_UNIX as long as they
+                # are SOCK_STREAM.
+                # We support passing AF_UNIX sockets even though we have
+                # a dedicated API for that: create_unix_connection.
+                # Disallowing AF_UNIX in this method, breaks backwards
+                # compatibility.
+                raise ValueError(
+                    'A Stream Socket was expected, got {!r}'.format(sock))
 
         transport, protocol = yield from self._create_connection_transport(
             sock, protocol_factory, ssl, server_hostname)
@@ -721,14 +809,17 @@
 
     @coroutine
     def _create_connection_transport(self, sock, protocol_factory, ssl,
-                                     server_hostname):
+                                     server_hostname, server_side=False):
+
+        sock.setblocking(False)
+
         protocol = protocol_factory()
         waiter = self.create_future()
         if ssl:
             sslcontext = None if isinstance(ssl, bool) else ssl
             transport = self._make_ssl_transport(
                 sock, protocol, sslcontext, waiter,
-                server_side=False, server_hostname=server_hostname)
+                server_side=server_side, server_hostname=server_hostname)
         else:
             transport = self._make_socket_transport(sock, protocol, waiter)
 
@@ -748,6 +839,9 @@
                                  allow_broadcast=None, sock=None):
         """Create datagram connection."""
         if sock is not None:
+            if not _is_dgram_socket(sock):
+                raise ValueError(
+                    'A UDP Socket was expected, got {!r}'.format(sock))
             if (local_addr or remote_addr or
                     family or proto or flags or
                     reuse_address or reuse_port or allow_broadcast):
@@ -813,12 +907,7 @@
                         sock.setsockopt(
                             socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                     if reuse_port:
-                        if not hasattr(socket, 'SO_REUSEPORT'):
-                            raise ValueError(
-                                'reuse_port not supported by socket module')
-                        else:
-                            sock.setsockopt(
-                                socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+                        _set_reuseport(sock)
                     if allow_broadcast:
                         sock.setsockopt(
                             socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
@@ -941,12 +1030,7 @@
                         sock.setsockopt(
                             socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
                     if reuse_port:
-                        if not hasattr(socket, 'SO_REUSEPORT'):
-                            raise ValueError(
-                                'reuse_port not supported by socket module')
-                        else:
-                            sock.setsockopt(
-                                socket.SOL_SOCKET, socket.SO_REUSEPORT, True)
+                        _set_reuseport(sock)
                     # Disable IPv4/IPv6 dual stack support (enabled by
                     # default on Linux) which makes a single socket
                     # listen on both address families.
@@ -968,18 +1052,44 @@
         else:
             if sock is None:
                 raise ValueError('Neither host/port nor sock were specified')
+            if not _is_stream_socket(sock):
+                raise ValueError(
+                    'A Stream Socket was expected, got {!r}'.format(sock))
             sockets = [sock]
 
         server = Server(self, sockets)
         for sock in sockets:
             sock.listen(backlog)
             sock.setblocking(False)
-            self._start_serving(protocol_factory, sock, ssl, server)
+            self._start_serving(protocol_factory, sock, ssl, server, backlog)
         if self._debug:
             logger.info("%r is serving", server)
         return server
 
     @coroutine
+    def connect_accepted_socket(self, protocol_factory, sock, *, ssl=None):
+        """Handle an accepted connection.
+
+        This is used by servers that accept connections outside of
+        asyncio but that use asyncio to handle connections.
+
+        This method is a coroutine.  When completed, the coroutine
+        returns a (transport, protocol) pair.
+        """
+        if not _is_stream_socket(sock):
+            raise ValueError(
+                'A Stream Socket was expected, got {!r}'.format(sock))
+
+        transport, protocol = yield from self._create_connection_transport(
+            sock, protocol_factory, ssl, '', server_side=True)
+        if self._debug:
+            # Get the socket from the transport because SSL transport closes
+            # the old socket and creates a new SSL socket
+            sock = transport.get_extra_info('socket')
+            logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
+        return transport, protocol
+
+    @coroutine
     def connect_read_pipe(self, protocol_factory, pipe):
         protocol = protocol_factory()
         waiter = self.create_future()
@@ -1048,7 +1158,7 @@
         transport = yield from self._make_subprocess_transport(
             protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
         if self._debug:
-            logger.info('%s: %r' % (debug_log, transport))
+            logger.info('%s: %r', debug_log, transport)
         return transport, protocol
 
     @coroutine
@@ -1078,7 +1188,7 @@
             protocol, popen_args, False, stdin, stdout, stderr,
             bufsize, **kwargs)
         if self._debug:
-            logger.info('%s: %r' % (debug_log, transport))
+            logger.info('%s: %r', debug_log, transport)
         return transport, protocol
 
     def get_exception_handler(self):
@@ -1158,7 +1268,9 @@
         - 'handle' (optional): Handle instance;
         - 'protocol' (optional): Protocol instance;
         - 'transport' (optional): Transport instance;
-        - 'socket' (optional): Socket instance.
+        - 'socket' (optional): Socket instance;
+        - 'asyncgen' (optional): Asynchronous generator that caused
+                                 the exception.
 
         New keys maybe introduced in the future.
 
diff --git a/lib-python/3/asyncio/base_subprocess.py b/lib-python/3/asyncio/base_subprocess.py
--- a/lib-python/3/asyncio/base_subprocess.py
+++ b/lib-python/3/asyncio/base_subprocess.py
@@ -3,7 +3,6 @@
 import warnings
 
 from . import compat
-from . import futures
 from . import protocols
 from . import transports
 from .coroutines import coroutine
@@ -87,6 +86,12 @@
     def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
         raise NotImplementedError
 
+    def set_protocol(self, protocol):
+        self._protocol = protocol
+
+    def get_protocol(self):
+        return self._protocol
+
     def is_closing(self):
         return self._closed
 
diff --git a/lib-python/3/asyncio/coroutines.py b/lib-python/3/asyncio/coroutines.py
--- a/lib-python/3/asyncio/coroutines.py
+++ b/lib-python/3/asyncio/coroutines.py
@@ -33,12 +33,16 @@
 
 try:
     _types_coroutine = types.coroutine
+    _types_CoroutineType = types.CoroutineType
 except AttributeError:
+    # Python 3.4
     _types_coroutine = None
+    _types_CoroutineType = None
 
 try:
     _inspect_iscoroutinefunction = inspect.iscoroutinefunction
 except AttributeError:
+    # Python 3.4
     _inspect_iscoroutinefunction = lambda func: False
 
 try:
@@ -120,8 +124,8 @@
         def send(self, value):
             return self.gen.send(value)
 
-    def throw(self, exc):
-        return self.gen.throw(exc)
+    def throw(self, type, value=None, traceback=None):
+        return self.gen.throw(type, value, traceback)
 
     def close(self):
         return self.gen.close()
@@ -204,8 +208,8 @@
         @functools.wraps(func)
         def coro(*args, **kw):
             res = func(*args, **kw)
-            if isinstance(res, futures.Future) or inspect.isgenerator(res) or \
-                    isinstance(res, CoroWrapper):
+            if (futures.isfuture(res) or inspect.isgenerator(res) or
+                isinstance(res, CoroWrapper)):
                 res = yield from res
             elif _AwaitableABC is not None:
                 # If 'func' returns an Awaitable (new in 3.5) we
@@ -238,19 +242,27 @@
             w.__qualname__ = getattr(func, '__qualname__', None)
             return w
 
-    wrapper._is_coroutine = True  # For iscoroutinefunction().
+    wrapper._is_coroutine = _is_coroutine  # For iscoroutinefunction().
     return wrapper
 
 
+# A marker for iscoroutinefunction.
+_is_coroutine = object()
+
+
 def iscoroutinefunction(func):
     """Return True if func is a decorated coroutine function."""
-    return (getattr(func, '_is_coroutine', False) or
+    return (getattr(func, '_is_coroutine', None) is _is_coroutine or
             _inspect_iscoroutinefunction(func))
 
 
 _COROUTINE_TYPES = (types.GeneratorType, CoroWrapper)
 if _CoroutineABC is not None:
     _COROUTINE_TYPES += (_CoroutineABC,)
+if _types_CoroutineType is not None:
+    # Prioritize native coroutine check to speed-up
+    # asyncio.iscoroutine.
+    _COROUTINE_TYPES = (_types_CoroutineType,) + _COROUTINE_TYPES
 
 
 def iscoroutine(obj):
@@ -261,6 +273,29 @@
 def _format_coroutine(coro):
     assert iscoroutine(coro)
 
+    if not hasattr(coro, 'cr_code') and not hasattr(coro, 'gi_code'):
+        # Most likely a built-in type or a Cython coroutine.
+
+        # Built-in types might not have __qualname__ or __name__.
+        coro_name = getattr(
+            coro, '__qualname__',
+            getattr(coro, '__name__', type(coro).__name__))
+        coro_name = '{}()'.format(coro_name)
+
+        running = False
+        try:
+            running = coro.cr_running
+        except AttributeError:
+            try:
+                running = coro.gi_running
+            except AttributeError:
+                pass
+
+        if running:
+            return '{} running'.format(coro_name)
+        else:
+            return coro_name
+
     coro_name = None
     if isinstance(coro, CoroWrapper):
         func = coro.func
@@ -271,7 +306,7 @@
         func = coro
 
     if coro_name is None:
-        coro_name = events._format_callback(func, ())
+        coro_name = events._format_callback(func, (), {})
 
     try:
         coro_code = coro.gi_code
diff --git a/lib-python/3/asyncio/events.py b/lib-python/3/asyncio/events.py
--- a/lib-python/3/asyncio/events.py
+++ b/lib-python/3/asyncio/events.py
@@ -6,6 +6,7 @@
            'get_event_loop_policy', 'set_event_loop_policy',
            'get_event_loop', 'set_event_loop', 'new_event_loop',
            'get_child_watcher', 'set_child_watcher',
+           '_set_running_loop', '_get_running_loop',
            ]
 
 import functools
@@ -35,23 +36,25 @@
     return None
 
 
-def _format_args(args):
-    """Format function arguments.
+def _format_args_and_kwargs(args, kwargs):
+    """Format function arguments and keyword arguments.
 
     Special case for a single parameter: ('hello',) is formatted as ('hello').
     """
     # use reprlib to limit the length of the output
-    args_repr = reprlib.repr(args)
-    if len(args) == 1 and args_repr.endswith(',)'):
-        args_repr = args_repr[:-2] + ')'
-    return args_repr
+    items = []
+    if args:
+        items.extend(reprlib.repr(arg) for arg in args)
+    if kwargs:
+        items.extend('{}={}'.format(k, reprlib.repr(v))
+                     for k, v in kwargs.items())
+    return '(' + ', '.join(items) + ')'
 
 
-def _format_callback(func, args, suffix=''):
+def _format_callback(func, args, kwargs, suffix=''):
     if isinstance(func, functools.partial):
-        if args is not None:
-            suffix = _format_args(args) + suffix
-        return _format_callback(func.func, func.args, suffix)
+        suffix = _format_args_and_kwargs(args, kwargs) + suffix
+        return _format_callback(func.func, func.args, func.keywords, suffix)
 
     if hasattr(func, '__qualname__'):
         func_repr = getattr(func, '__qualname__')
@@ -60,14 +63,13 @@
     else:
         func_repr = repr(func)
 
-    if args is not None:
-        func_repr += _format_args(args)
+    func_repr += _format_args_and_kwargs(args, kwargs)
     if suffix:
         func_repr += suffix
     return func_repr
 
 def _format_callback_source(func, args):
-    func_repr = _format_callback(func, args)
+    func_repr = _format_callback(func, args, None)
     source = _get_function_source(func)
     if source:
         func_repr += ' at %s:%s' % source
@@ -81,7 +83,6 @@
                  '_source_traceback', '_repr', '__weakref__')
 
     def __init__(self, callback, args, loop):
-        assert not isinstance(callback, Handle), 'A Handle is not a callback'
         self._loop = loop
         self._callback = callback
         self._args = args
@@ -248,6 +249,10 @@
         """
         raise NotImplementedError
 
+    def shutdown_asyncgens(self):
+        """Shutdown all active asynchronous generators."""
+        raise NotImplementedError
+
     # Methods scheduling callbacks.  All these return Handles.
 
     def _timer_handle_cancelled(self, handle):
@@ -603,6 +608,30 @@
 _lock = threading.Lock()
 
 
+# A TLS for the running event loop, used by _get_running_loop.
+class _RunningLoop(threading.local):
+    _loop = None
+_running_loop = _RunningLoop()
+
+
+def _get_running_loop():
+    """Return the running event loop or None.
+
+    This is a low-level function intended to be used by event loops.
+    This function is thread-specific.
+    """
+    return _running_loop._loop
+
+
+def _set_running_loop(loop):
+    """Set the running event loop.
+
+    This is a low-level function intended to be used by event loops.
+    This function is thread-specific.
+    """
+    _running_loop._loop = loop
+
+
 def _init_event_loop_policy():
     global _event_loop_policy
     with _lock:
@@ -628,7 +657,17 @@
 
 
 def get_event_loop():
-    """Equivalent to calling get_event_loop_policy().get_event_loop()."""
+    """Return an asyncio event loop.
+
+    When called from a coroutine or a callback (e.g. scheduled with call_soon
+    or similar API), this function will always return the running event loop.
+
+    If there is no running event loop set, the function will return
+    the result of `get_event_loop_policy().get_event_loop()` call.
+    """
+    current_loop = _get_running_loop()
+    if current_loop is not None:
+        return current_loop
     return get_event_loop_policy().get_event_loop()
 
 
diff --git a/lib-python/3/asyncio/futures.py b/lib-python/3/asyncio/futures.py
--- a/lib-python/3/asyncio/futures.py
+++ b/lib-python/3/asyncio/futures.py
@@ -2,7 +2,7 @@
 
 __all__ = ['CancelledError', 'TimeoutError',
            'InvalidStateError',
-           'Future', 'wrap_future',
+           'Future', 'wrap_future', 'isfuture',
            ]
 
 import concurrent.futures._base
@@ -110,6 +110,17 @@
             self.loop.call_exception_handler({'message': msg})
 
 
+def isfuture(obj):
+    """Check for a Future.
+
+    This returns True when obj is a Future instance or is advertising
+    itself as duck-type compatible by setting _asyncio_future_blocking.
+    See comment in Future for more details.
+    """
+    return (hasattr(obj.__class__, '_asyncio_future_blocking') and
+            obj._asyncio_future_blocking is not None)
+
+
 class Future:
     """This class is *almost* compatible with concurrent.futures.Future.
 
@@ -134,7 +145,15 @@
     _loop = None
     _source_traceback = None
 
-    _blocking = False  # proper use of future (yield vs yield from)
+    # This field is used for a dual purpose:
+    # - Its presence is a marker to declare that a class implements
+    #   the Future protocol (i.e. is intended to be duck-type compatible).
+    #   The value must also be not-None, to enable a subclass to declare
+    #   that it is not compatible by setting this to None.
+    # - It is set by __iter__() below so that Task._step() can tell
+    #   the difference between `yield from Future()` (correct) vs.
+    #   `yield Future()` (incorrect).
+    _asyncio_future_blocking = False
 
     _log_traceback = False   # Used for Python 3.4 and later
     _tb_logger = None        # Used for Python 3.3 only
@@ -357,7 +376,7 @@
 
     def __iter__(self):
         if not self.done():
-            self._blocking = True
+            self._asyncio_future_blocking = True
             yield self  # This tells Task to wait for completion.
         assert self.done(), "yield from wasn't used with future"
         return self.result()  # May raise too.
@@ -415,15 +434,17 @@
     If destination is cancelled, source gets cancelled too.
     Compatible with both asyncio.Future and concurrent.futures.Future.
     """
-    if not isinstance(source, (Future, concurrent.futures.Future)):
+    if not isfuture(source) and not isinstance(source,
+                                               concurrent.futures.Future):
         raise TypeError('A future is required for source argument')
-    if not isinstance(destination, (Future, concurrent.futures.Future)):
+    if not isfuture(destination) and not isinstance(destination,
+                                                    concurrent.futures.Future):
         raise TypeError('A future is required for destination argument')
-    source_loop = source._loop if isinstance(source, Future) else None
-    dest_loop = destination._loop if isinstance(destination, Future) else None
+    source_loop = source._loop if isfuture(source) else None
+    dest_loop = destination._loop if isfuture(destination) else None
 
     def _set_state(future, other):
-        if isinstance(future, Future):
+        if isfuture(future):
             _copy_future_state(other, future)
         else:
             _set_concurrent_future_state(future, other)
@@ -447,7 +468,7 @@
 
 def wrap_future(future, *, loop=None):
     """Wrap concurrent.futures.Future object."""
-    if isinstance(future, Future):
+    if isfuture(future):
         return future
     assert isinstance(future, concurrent.futures.Future), \
         'concurrent.futures.Future is expected, got {!r}'.format(future)
diff --git a/lib-python/3/asyncio/locks.py b/lib-python/3/asyncio/locks.py
--- a/lib-python/3/asyncio/locks.py
+++ b/lib-python/3/asyncio/locks.py
@@ -166,7 +166,7 @@
         This method blocks until the lock is unlocked, then sets it to
         locked and returns True.
         """
-        if not self._waiters and not self._locked:
+        if not self._locked and all(w.cancelled() for w in self._waiters):
             self._locked = True
             return True
 
diff --git a/lib-python/3/asyncio/proactor_events.py b/lib-python/3/asyncio/proactor_events.py
--- a/lib-python/3/asyncio/proactor_events.py
+++ b/lib-python/3/asyncio/proactor_events.py
@@ -66,6 +66,12 @@
     def _set_extra(self, sock):
         self._extra['pipe'] = sock
 
+    def set_protocol(self, protocol):
+        self._protocol = protocol
+
+    def get_protocol(self):
+        return self._protocol
+
     def is_closing(self):
         return self._closing
 
@@ -488,7 +494,7 @@
         self._csock.send(b'\0')
 
     def _start_serving(self, protocol_factory, sock,
-                       sslcontext=None, server=None):
+                       sslcontext=None, server=None, backlog=100):
 
         def loop(f=None):
             try:
diff --git a/lib-python/3/asyncio/queues.py b/lib-python/3/asyncio/queues.py
--- a/lib-python/3/asyncio/queues.py
+++ b/lib-python/3/asyncio/queues.py
@@ -7,7 +7,6 @@
 
 from . import compat
 from . import events
-from . import futures
 from . import locks
 from .coroutines import coroutine
 
diff --git a/lib-python/3/asyncio/selector_events.py b/lib-python/3/asyncio/selector_events.py
--- a/lib-python/3/asyncio/selector_events.py
+++ b/lib-python/3/asyncio/selector_events.py
@@ -11,6 +11,7 @@
 import functools
 import socket
 import warnings
+import weakref
 try:
     import ssl
 except ImportError:  # pragma: no cover
@@ -39,6 +40,17 @@
         return bool(key.events & event)
 
 
+if hasattr(socket, 'TCP_NODELAY'):
+    def _set_nodelay(sock):
+        if (sock.family in {socket.AF_INET, socket.AF_INET6} and
+                sock.type == socket.SOCK_STREAM and
+                sock.proto == socket.IPPROTO_TCP):
+            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+else:
+    def _set_nodelay(sock):
+        pass
+
+
 class BaseSelectorEventLoop(base_events.BaseEventLoop):
     """Selector event loop.
 
@@ -53,6 +65,7 @@
         logger.debug('Using selector: %s', selector.__class__.__name__)
         self._selector = selector
         self._make_self_pipe()
+        self._transports = weakref.WeakValueDictionary()
 
     def _make_socket_transport(self, sock, protocol, waiter=None, *,
                                extra=None, server=None):
@@ -104,7 +117,7 @@
         raise NotImplementedError
 
     def _close_self_pipe(self):
-        self.remove_reader(self._ssock.fileno())
+        self._remove_reader(self._ssock.fileno())
         self._ssock.close()
         self._ssock = None
         self._csock.close()
@@ -117,7 +130,7 @@
         self._ssock.setblocking(False)
         self._csock.setblocking(False)
         self._internal_fds += 1
-        self.add_reader(self._ssock.fileno(), self._read_from_self)
+        self._add_reader(self._ssock.fileno(), self._read_from_self)
 
     def _process_self_data(self, data):
         pass
@@ -151,43 +164,50 @@
                                  exc_info=True)
 
     def _start_serving(self, protocol_factory, sock,
-                       sslcontext=None, server=None):
-        self.add_reader(sock.fileno(), self._accept_connection,
-                        protocol_factory, sock, sslcontext, server)
+                       sslcontext=None, server=None, backlog=100):
+        self._add_reader(sock.fileno(), self._accept_connection,
+                         protocol_factory, sock, sslcontext, server, backlog)
 
     def _accept_connection(self, protocol_factory, sock,
-                           sslcontext=None, server=None):
-        try:
-            conn, addr = sock.accept()
-            if self._debug:
-                logger.debug("%r got a new connection from %r: %r",
-                             server, addr, conn)
-            conn.setblocking(False)
-        except (BlockingIOError, InterruptedError, ConnectionAbortedError):
-            pass  # False alarm.
-        except OSError as exc:
-            # There's nowhere to send the error, so just log it.
-            if exc.errno in (errno.EMFILE, errno.ENFILE,
-                             errno.ENOBUFS, errno.ENOMEM):
-                # Some platforms (e.g. Linux keep reporting the FD as
-                # ready, so we remove the read handler temporarily.
-                # We'll try again in a while.
-                self.call_exception_handler({
-                    'message': 'socket.accept() out of system resource',
-                    'exception': exc,
-                    'socket': sock,
-                })
-                self.remove_reader(sock.fileno())
-                self.call_later(constants.ACCEPT_RETRY_DELAY,
-                                self._start_serving,
-                                protocol_factory, sock, sslcontext, server)
+                           sslcontext=None, server=None, backlog=100):
+        # This method is only called once for each event loop tick where the
+        # listening socket has triggered an EVENT_READ. There may be multiple
+        # connections waiting for an .accept() so it is called in a loop.
+        # See https://bugs.python.org/issue27906 for more details.
+        for _ in range(backlog):
+            try:
+                conn, addr = sock.accept()
+                if self._debug:
+                    logger.debug("%r got a new connection from %r: %r",
+                                 server, addr, conn)
+                conn.setblocking(False)
+            except (BlockingIOError, InterruptedError, ConnectionAbortedError):
+                # Early exit because the socket accept buffer is empty.
+                return None
+            except OSError as exc:
+                # There's nowhere to send the error, so just log it.
+                if exc.errno in (errno.EMFILE, errno.ENFILE,
+                                 errno.ENOBUFS, errno.ENOMEM):
+                    # Some platforms (e.g. Linux keep reporting the FD as
+                    # ready, so we remove the read handler temporarily.
+                    # We'll try again in a while.
+                    self.call_exception_handler({
+                        'message': 'socket.accept() out of system resource',
+                        'exception': exc,
+                        'socket': sock,
+                    })
+                    self._remove_reader(sock.fileno())
+                    self.call_later(constants.ACCEPT_RETRY_DELAY,
+                                    self._start_serving,
+                                    protocol_factory, sock, sslcontext, server,
+                                    backlog)
+                else:
+                    raise  # The event loop will catch, log and ignore it.
             else:
-                raise  # The event loop will catch, log and ignore it.
-        else:
-            extra = {'peername': addr}
-            accept = self._accept_connection2(protocol_factory, conn, extra,
-                                              sslcontext, server)
-            self.create_task(accept)
+                extra = {'peername': addr}
+                accept = self._accept_connection2(protocol_factory, conn, extra,
+                                                  sslcontext, server)
+                self.create_task(accept)
 
     @coroutine
     def _accept_connection2(self, protocol_factory, conn, extra,
@@ -226,8 +246,18 @@
                     context['transport'] = transport
                 self.call_exception_handler(context)
 
-    def add_reader(self, fd, callback, *args):
-        """Add a reader callback."""
+    def _ensure_fd_no_transport(self, fd):
+        try:
+            transport = self._transports[fd]
+        except KeyError:
+            pass
+        else:
+            if not transport.is_closing():
+                raise RuntimeError(
+                    'File descriptor {!r} is used by transport {!r}'.format(
+                        fd, transport))
+
+    def _add_reader(self, fd, callback, *args):
         self._check_closed()
         handle = events.Handle(callback, args, self)
         try:
@@ -242,8 +272,7 @@
             if reader is not None:
                 reader.cancel()
 
-    def remove_reader(self, fd):
-        """Remove a reader callback."""
+    def _remove_reader(self, fd):
         if self.is_closed():
             return False
         try:
@@ -264,8 +293,7 @@
             else:
                 return False
 
-    def add_writer(self, fd, callback, *args):
-        """Add a writer callback.."""
+    def _add_writer(self, fd, callback, *args):
         self._check_closed()
         handle = events.Handle(callback, args, self)
         try:
@@ -280,7 +308,7 @@
             if writer is not None:
                 writer.cancel()
 
-    def remove_writer(self, fd):
+    def _remove_writer(self, fd):
         """Remove a writer callback."""
         if self.is_closed():
             return False
@@ -303,6 +331,26 @@
             else:
                 return False
 
+    def add_reader(self, fd, callback, *args):
+        """Add a reader callback."""
+        self._ensure_fd_no_transport(fd)
+        return self._add_reader(fd, callback, *args)
+
+    def remove_reader(self, fd):
+        """Remove a reader callback."""
+        self._ensure_fd_no_transport(fd)
+        return self._remove_reader(fd)
+
+    def add_writer(self, fd, callback, *args):
+        """Add a writer callback.."""
+        self._ensure_fd_no_transport(fd)
+        return self._add_writer(fd, callback, *args)
+
+    def remove_writer(self, fd):
+        """Remove a writer callback."""
+        self._ensure_fd_no_transport(fd)
+        return self._remove_writer(fd)
+
     def sock_recv(self, sock, n):
         """Receive data from the socket.
 
@@ -382,6 +430,7 @@
                 data = data[n:]
             self.add_writer(fd, self._sock_sendall, fut, True, sock, data)
 
+    @coroutine
     def sock_connect(self, sock, address):
         """Connect to a remote socket at address.
 
@@ -390,23 +439,16 @@
         if self._debug and sock.gettimeout() != 0:
             raise ValueError("the socket must be non-blocking")
 
+        if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX:
+            resolved = base_events._ensure_resolved(
+                address, family=sock.family, proto=sock.proto, loop=self)
+            if not resolved.done():
+                yield from resolved
+            _, _, _, _, address = resolved.result()[0]
+
         fut = self.create_future()
-        if hasattr(socket, 'AF_UNIX') and sock.family == socket.AF_UNIX:
-            self._sock_connect(fut, sock, address)
-        else:
-            resolved = base_events._ensure_resolved(address, loop=self)
-            resolved.add_done_callback(
-                lambda resolved: self._on_resolved(fut, sock, resolved))
-
-        return fut
-
-    def _on_resolved(self, fut, sock, resolved):
-        try:
-            _, _, _, _, address = resolved.result()[0]
-        except Exception as exc:
-            fut.set_exception(exc)
-        else:
-            self._sock_connect(fut, sock, address)
+        self._sock_connect(fut, sock, address)
+        return (yield from fut)
 
     def _sock_connect(self, fut, sock, address):
         fd = sock.fileno()
@@ -417,8 +459,8 @@
             # connection runs in background. We have to wait until the socket
             # becomes writable to be notified when the connection succeed or
             # fails.
-            fut.add_done_callback(functools.partial(self._sock_connect_done,
-                                                    fd))
+            fut.add_done_callback(
+                functools.partial(self._sock_connect_done, fd))
             self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
         except Exception as exc:
             fut.set_exception(exc)
@@ -482,17 +524,17 @@
             fileobj, (reader, writer) = key.fileobj, key.data
             if mask & selectors.EVENT_READ and reader is not None:
                 if reader._cancelled:
-                    self.remove_reader(fileobj)
+                    self._remove_reader(fileobj)
                 else:
                     self._add_callback(reader)
             if mask & selectors.EVENT_WRITE and writer is not None:
                 if writer._cancelled:
-                    self.remove_writer(fileobj)
+                    self._remove_writer(fileobj)
                 else:
                     self._add_callback(writer)
 
     def _stop_serving(self, sock):
-        self.remove_reader(sock.fileno())
+        self._remove_reader(sock.fileno())
         sock.close()
 
 
@@ -527,6 +569,7 @@
         self._closing = False  # Set when close() called.
         if self._server is not None:
             self._server._attach()
+        loop._transports[self._sock_fd] = self
 
     def __repr__(self):
         info = [self.__class__.__name__]
@@ -559,6 +602,12 @@
     def abort(self):
         self._force_close(None)
 
+    def set_protocol(self, protocol):
+        self._protocol = protocol
+
+    def get_protocol(self):
+        return self._protocol
+
     def is_closing(self):
         return self._closing
 
@@ -566,10 +615,10 @@
         if self._closing:
             return
         self._closing = True
-        self._loop.remove_reader(self._sock_fd)
+        self._loop._remove_reader(self._sock_fd)
         if not self._buffer:
             self._conn_lost += 1
-            self._loop.remove_writer(self._sock_fd)
+            self._loop._remove_writer(self._sock_fd)
             self._loop.call_soon(self._call_connection_lost, None)
 
     # On Python 3.3 and older, objects with a destructor part of a reference
@@ -600,10 +649,10 @@
             return
         if self._buffer:
             self._buffer.clear()
-            self._loop.remove_writer(self._sock_fd)
+            self._loop._remove_writer(self._sock_fd)
         if not self._closing:
             self._closing = True
-            self._loop.remove_reader(self._sock_fd)
+            self._loop._remove_reader(self._sock_fd)
         self._conn_lost += 1
         self._loop.call_soon(self._call_connection_lost, exc)
 
@@ -633,9 +682,14 @@
         self._eof = False
         self._paused = False
 
+        # Disable the Nagle algorithm -- small writes will be
+        # sent without waiting for the TCP ACK.  This generally
+        # decreases the latency (in some cases significantly.)
+        _set_nodelay(self._sock)
+
         self._loop.call_soon(self._protocol.connection_made, self)
         # only start reading when connection_made() has been called
-        self._loop.call_soon(self._loop.add_reader,
+        self._loop.call_soon(self._loop._add_reader,
                              self._sock_fd, self._read_ready)
         if waiter is not None:
             # only wake up the waiter when connection_made() has been called
@@ -648,7 +702,7 @@
         if self._paused:
             raise RuntimeError('Already paused')
         self._paused = True
-        self._loop.remove_reader(self._sock_fd)
+        self._loop._remove_reader(self._sock_fd)
         if self._loop.get_debug():
             logger.debug("%r pauses reading", self)
 
@@ -658,7 +712,7 @@
         self._paused = False
         if self._closing:
             return
-        self._loop.add_reader(self._sock_fd, self._read_ready)
+        self._loop._add_reader(self._sock_fd, self._read_ready)
         if self._loop.get_debug():
             logger.debug("%r resumes reading", self)
 
@@ -682,7 +736,7 @@
                     # We're keeping the connection open so the
                     # protocol can write more, but we still can't
                     # receive more, so remove the reader callback.
-                    self._loop.remove_reader(self._sock_fd)
+                    self._loop._remove_reader(self._sock_fd)
                 else:
                     self.close()
 
@@ -715,7 +769,7 @@
                 if not data:
                     return
             # Not all was written; register write handler.
-            self._loop.add_writer(self._sock_fd, self._write_ready)
+            self._loop._add_writer(self._sock_fd, self._write_ready)
 
         # Add it to the buffer.
         self._buffer.extend(data)
@@ -731,7 +785,7 @@
         except (BlockingIOError, InterruptedError):
             pass
         except Exception as exc:
-            self._loop.remove_writer(self._sock_fd)
+            self._loop._remove_writer(self._sock_fd)
             self._buffer.clear()
             self._fatal_error(exc, 'Fatal write error on socket transport')
         else:
@@ -739,7 +793,7 @@
                 del self._buffer[:n]
             self._maybe_resume_protocol()  # May append to buffer.
             if not self._buffer:
-                self._loop.remove_writer(self._sock_fd)
+                self._loop._remove_writer(self._sock_fd)
                 if self._closing:
                     self._call_connection_lost(None)
                 elif self._eof:
@@ -810,19 +864,19 @@
         try:
             self._sock.do_handshake()
         except ssl.SSLWantReadError:
-            self._loop.add_reader(self._sock_fd,
-                                  self._on_handshake, start_time)
+            self._loop._add_reader(self._sock_fd,
+                                   self._on_handshake, start_time)
             return
         except ssl.SSLWantWriteError:
-            self._loop.add_writer(self._sock_fd,
-                                  self._on_handshake, start_time)
+            self._loop._add_writer(self._sock_fd,
+                                   self._on_handshake, start_time)
             return
         except BaseException as exc:
             if self._loop.get_debug():
                 logger.warning("%r: SSL handshake failed",
                                self, exc_info=True)
-            self._loop.remove_reader(self._sock_fd)
-            self._loop.remove_writer(self._sock_fd)
+            self._loop._remove_reader(self._sock_fd)
+            self._loop._remove_writer(self._sock_fd)
             self._sock.close()
             self._wakeup_waiter(exc)
             if isinstance(exc, Exception):
@@ -830,8 +884,8 @@
             else:
                 raise
 
-        self._loop.remove_reader(self._sock_fd)
-        self._loop.remove_writer(self._sock_fd)
+        self._loop._remove_reader(self._sock_fd)
+        self._loop._remove_writer(self._sock_fd)
 
         peercert = self._sock.getpeercert()
         if not hasattr(self._sslcontext, 'check_hostname'):
@@ -859,7 +913,7 @@
 
         self._read_wants_write = False
         self._write_wants_read = False
-        self._loop.add_reader(self._sock_fd, self._read_ready)
+        self._loop._add_reader(self._sock_fd, self._read_ready)
         self._protocol_connected = True
         self._loop.call_soon(self._protocol.connection_made, self)
         # only wake up the waiter when connection_made() has been called
@@ -881,7 +935,7 @@
         if self._paused:
             raise RuntimeError('Already paused')
         self._paused = True
-        self._loop.remove_reader(self._sock_fd)
+        self._loop._remove_reader(self._sock_fd)
         if self._loop.get_debug():
             logger.debug("%r pauses reading", self)
 
@@ -891,7 +945,7 @@
         self._paused = False
         if self._closing:
             return
-        self._loop.add_reader(self._sock_fd, self._read_ready)
+        self._loop._add_reader(self._sock_fd, self._read_ready)
         if self._loop.get_debug():
             logger.debug("%r resumes reading", self)
 
@@ -903,7 +957,7 @@
             self._write_ready()
 
             if self._buffer:
-                self._loop.add_writer(self._sock_fd, self._write_ready)
+                self._loop._add_writer(self._sock_fd, self._write_ready)
 
         try:
             data = self._sock.recv(self.max_size)
@@ -911,8 +965,8 @@
             pass
         except ssl.SSLWantWriteError:
             self._read_wants_write = True
-            self._loop.remove_reader(self._sock_fd)
-            self._loop.add_writer(self._sock_fd, self._write_ready)
+            self._loop._remove_reader(self._sock_fd)
+            self._loop._add_writer(self._sock_fd, self._write_ready)
         except Exception as exc:
             self._fatal_error(exc, 'Fatal read error on SSL transport')
         else:
@@ -937,7 +991,7 @@
             self._read_ready()
 
             if not (self._paused or self._closing):
-                self._loop.add_reader(self._sock_fd, self._read_ready)
+                self._loop._add_reader(self._sock_fd, self._read_ready)
 
         if self._buffer:
             try:
@@ -946,10 +1000,10 @@
                 n = 0
             except ssl.SSLWantReadError:
                 n = 0
-                self._loop.remove_writer(self._sock_fd)
+                self._loop._remove_writer(self._sock_fd)
                 self._write_wants_read = True
             except Exception as exc:
-                self._loop.remove_writer(self._sock_fd)
+                self._loop._remove_writer(self._sock_fd)
                 self._buffer.clear()
                 self._fatal_error(exc, 'Fatal write error on SSL transport')
                 return
@@ -960,7 +1014,7 @@
         self._maybe_resume_protocol()  # May append to buffer.
 
         if not self._buffer:
-            self._loop.remove_writer(self._sock_fd)
+            self._loop._remove_writer(self._sock_fd)
             if self._closing:
                 self._call_connection_lost(None)
 
@@ -978,7 +1032,7 @@
             return
 
         if not self._buffer:
-            self._loop.add_writer(self._sock_fd, self._write_ready)
+            self._loop._add_writer(self._sock_fd, self._write_ready)
 
         # Add it to the buffer.
         self._buffer.extend(data)
@@ -998,7 +1052,7 @@
         self._address = address
         self._loop.call_soon(self._protocol.connection_made, self)
         # only start reading when connection_made() has been called
-        self._loop.call_soon(self._loop.add_reader,
+        self._loop.call_soon(self._loop._add_reader,
                              self._sock_fd, self._read_ready)
         if waiter is not None:
             # only wake up the waiter when connection_made() has been called
@@ -1048,7 +1102,7 @@
                     self._sock.sendto(data, addr)
                 return
             except (BlockingIOError, InterruptedError):
-                self._loop.add_writer(self._sock_fd, self._sendto_ready)
+                self._loop._add_writer(self._sock_fd, self._sendto_ready)
             except OSError as exc:
                 self._protocol.error_received(exc)
                 return
@@ -1082,6 +1136,6 @@
 
         self._maybe_resume_protocol()  # May append to buffer.
         if not self._buffer:
-            self._loop.remove_writer(self._sock_fd)
+            self._loop._remove_writer(self._sock_fd)
             if self._closing:
                 self._call_connection_lost(None)
diff --git a/lib-python/3/asyncio/sslproto.py b/lib-python/3/asyncio/sslproto.py
--- a/lib-python/3/asyncio/sslproto.py
+++ b/lib-python/3/asyncio/sslproto.py
@@ -5,6 +5,7 @@
 except ImportError:  # pragma: no cover
     ssl = None
 
+from . import base_events
 from . import compat
 from . import protocols
 from . import transports
@@ -304,6 +305,12 @@
         """Get optional transport information."""
         return self._ssl_protocol._get_extra_info(name, default)
 
+    def set_protocol(self, protocol):
+        self._app_protocol = protocol
+
+    def get_protocol(self):
+        return self._app_protocol
+
     def is_closing(self):
         return self._closed
 
@@ -403,7 +410,8 @@
     """
 
     def __init__(self, loop, app_protocol, sslcontext, waiter,
-                 server_side=False, server_hostname=None):
+                 server_side=False, server_hostname=None,
+                 call_connection_made=True):
         if ssl is None:
             raise RuntimeError('stdlib ssl module not available')
 
@@ -436,6 +444,7 @@
         self._in_shutdown = False
         # transport, ex: SelectorSocketTransport
         self._transport = None
+        self._call_connection_made = call_connection_made
 
     def _wakeup_waiter(self, exc=None):
         if self._waiter is None:
@@ -470,6 +479,7 @@
             self._loop.call_soon(self._app_protocol.connection_lost, exc)
         self._transport = None
         self._app_transport = None
+        self._wakeup_waiter(exc)
 
     def pause_writing(self):
         """Called when the low-level transport's buffer goes over
@@ -599,7 +609,8 @@
                            compression=sslobj.compression(),
                            ssl_object=sslobj,
                            )
-        self._app_protocol.connection_made(self._app_transport)
+        if self._call_connection_made:
+            self._app_protocol.connection_made(self._app_transport)
         self._wakeup_waiter()
         self._session_established = True
         # In case transport.write() was already called. Don't call
diff --git a/lib-python/3/asyncio/streams.py b/lib-python/3/asyncio/streams.py
--- a/lib-python/3/asyncio/streams.py
+++ b/lib-python/3/asyncio/streams.py
@@ -448,6 +448,7 @@
         assert not self._eof, '_wait_for_data after EOF'
 
         # Waiting for data while paused will make deadlock, so prevent it.
+        # This is essential for readexactly(n) for case when n > self._limit.
         if self._paused:
             self._paused = False
             self._transport.resume_reading()
@@ -590,7 +591,7 @@
         bytes. If the EOF was received and the internal buffer is empty, return
         an empty bytes object.
 
-        If n is zero, return empty bytes object immediatelly.
+        If n is zero, return empty bytes object immediately.
 
         If n is positive, this function try to read `n` bytes, and may return
         less or equal bytes than requested, but at least one byte. If EOF was
@@ -658,25 +659,22 @@
         if n == 0:
             return b''
 
-        # There used to be "optimized" code here.  It created its own
-        # Future and waited until self._buffer had at least the n
-        # bytes, then called read(n).  Unfortunately, this could pause
-        # the transport if the argument was larger than the pause
-        # limit (which is twice self._limit).  So now we just read()
-        # into a local buffer.
+        while len(self._buffer) < n:
+            if self._eof:
+                incomplete = bytes(self._buffer)
+                self._buffer.clear()
+                raise IncompleteReadError(incomplete, n)
 
-        blocks = []
-        while n > 0:
-            block = yield from self.read(n)
-            if not block:
-                partial = b''.join(blocks)
-                raise IncompleteReadError(partial, len(partial) + n)
-            blocks.append(block)
-            n -= len(block)
+            yield from self._wait_for_data('readexactly')
 
-        assert n == 0
-
-        return b''.join(blocks)
+        if len(self._buffer) == n:
+            data = bytes(self._buffer)
+            self._buffer.clear()
+        else:
+            data = bytes(self._buffer[:n])
+            del self._buffer[:n]
+        self._maybe_resume_transport()
+        return data
 
     if compat.PY35:
         @coroutine
diff --git a/lib-python/3/asyncio/tasks.py b/lib-python/3/asyncio/tasks.py
--- a/lib-python/3/asyncio/tasks.py
+++ b/lib-python/3/asyncio/tasks.py
@@ -241,7 +241,7 @@
                 result = coro.throw(exc)
         except StopIteration as exc:
             self.set_result(exc.value)
-        except futures.CancelledError as exc:
+        except futures.CancelledError:
             super().cancel()  # I.e., Future.cancel(self).
         except Exception as exc:
             self.set_exception(exc)
@@ -249,7 +249,8 @@
             self.set_exception(exc)
             raise
         else:
-            if isinstance(result, futures.Future):
+            blocking = getattr(result, '_asyncio_future_blocking', None)
+            if blocking is not None:
                 # Yielded Future must come from Future.__iter__().
                 if result._loop is not self._loop:
                     self._loop.call_soon(
@@ -257,13 +258,20 @@
                         RuntimeError(
                             'Task {!r} got Future {!r} attached to a '
                             'different loop'.format(self, result)))
-                elif result._blocking:
-                    result._blocking = False
-                    result.add_done_callback(self._wakeup)
-                    self._fut_waiter = result
-                    if self._must_cancel:
-                        if self._fut_waiter.cancel():
-                            self._must_cancel = False
+                elif blocking:
+                    if result is self:
+                        self._loop.call_soon(
+                            self._step,
+                            RuntimeError(
+                                'Task cannot await on itself: {!r}'.format(
+                                    self)))
+                    else:
+                        result._asyncio_future_blocking = False
+                        result.add_done_callback(self._wakeup)
+                        self._fut_waiter = result
+                        if self._must_cancel:
+                            if self._fut_waiter.cancel():
+                                self._must_cancel = False
                 else:
                     self._loop.call_soon(
                         self._step,
@@ -332,7 +340,7 @@
     Note: This does not raise TimeoutError! Futures that aren't done
     when the timeout occurs are returned in the second set.
     """
-    if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
+    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
         raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
     if not fs:
         raise ValueError('Set of coroutines/Futures is empty.')
@@ -461,7 +469,7 @@
 
     Note: The futures 'f' are not necessarily members of fs.
     """
-    if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
+    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
         raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
     loop = loop if loop is not None else events.get_event_loop()
     todo = {ensure_future(f, loop=loop) for f in set(fs)}
@@ -518,7 +526,7 @@
         h.cancel()
 
 
-def async(coro_or_future, *, loop=None):
+def async_(coro_or_future, *, loop=None):
     """Wrap a coroutine in a future.
 
     If the argument is a Future, it is returned directly.
@@ -531,13 +539,18 @@
 
     return ensure_future(coro_or_future, loop=loop)
 
+# Silence DeprecationWarning:
+globals()['async'] = async_
+async_.__name__ = 'async'
+del async_
+
 
 def ensure_future(coro_or_future, *, loop=None):
     """Wrap a coroutine or an awaitable in a future.
 
     If the argument is a Future, it is returned directly.
     """
-    if isinstance(coro_or_future, futures.Future):
+    if futures.isfuture(coro_or_future):
         if loop is not None and loop is not coro_or_future._loop:
             raise ValueError('loop argument must agree with Future')
         return coro_or_future
@@ -579,15 +592,21 @@
     def cancel(self):
         if self.done():
             return False
+        ret = False
         for child in self._children:
-            child.cancel()
-        return True
+            if child.cancel():
+                ret = True
+        return ret
 
 
 def gather(*coros_or_futures, loop=None, return_exceptions=False):
     """Return a future aggregating results from the given coroutines
     or futures.
 
+    Coroutines will be wrapped in a future and scheduled in the event
+    loop. They will not necessarily be scheduled in the same order as
+    passed in.
+
     All futures must share the same event loop.  If all the tasks are
     done successfully, the returned future's result is the list of
     results (in the order of the original sequence, not necessarily
@@ -613,7 +632,7 @@
 
     arg_to_fut = {}
     for arg in set(coros_or_futures):
-        if not isinstance(arg, futures.Future):
+        if not futures.isfuture(arg):
             fut = ensure_future(arg, loop=loop)
             if loop is None:
                 loop = fut._loop
diff --git a/lib-python/3/asyncio/test_utils.py b/lib-python/3/asyncio/test_utils.py
--- a/lib-python/3/asyncio/test_utils.py
+++ b/lib-python/3/asyncio/test_utils.py
@@ -13,6 +13,8 @@
 import threading
 import time
 import unittest
+import weakref
+
 from unittest import mock
 
 from http.server import HTTPServer
@@ -300,6 +302,8 @@
         self.writers = {}
         self.reset_counters()
 
+        self._transports = weakref.WeakValueDictionary()
+
     def time(self):
         return self._time
 
@@ -318,10 +322,10 @@
             else:  # pragma: no cover
                 raise AssertionError("Time generator is not finished")
 
-    def add_reader(self, fd, callback, *args):
+    def _add_reader(self, fd, callback, *args):
         self.readers[fd] = events.Handle(callback, args, self)
 
-    def remove_reader(self, fd):
+    def _remove_reader(self, fd):
         self.remove_reader_count[fd] += 1
         if fd in self.readers:
             del self.readers[fd]
@@ -337,10 +341,10 @@
         assert handle._args == args, '{!r} != {!r}'.format(
             handle._args, args)
 
-    def add_writer(self, fd, callback, *args):
+    def _add_writer(self, fd, callback, *args):
         self.writers[fd] = events.Handle(callback, args, self)
 
-    def remove_writer(self, fd):
+    def _remove_writer(self, fd):
         self.remove_writer_count[fd] += 1
         if fd in self.writers:
             del self.writers[fd]
@@ -356,6 +360,36 @@
         assert handle._args == args, '{!r} != {!r}'.format(
             handle._args, args)
 
+    def _ensure_fd_no_transport(self, fd):
+        try:
+            transport = self._transports[fd]
+        except KeyError:
+            pass
+        else:
+            raise RuntimeError(
+                'File descriptor {!r} is used by transport {!r}'.format(
+                    fd, transport))
+
+    def add_reader(self, fd, callback, *args):
+        """Add a reader callback."""
+        self._ensure_fd_no_transport(fd)
+        return self._add_reader(fd, callback, *args)
+
+    def remove_reader(self, fd):
+        """Remove a reader callback."""
+        self._ensure_fd_no_transport(fd)
+        return self._remove_reader(fd)
+
+    def add_writer(self, fd, callback, *args):
+        """Add a writer callback.."""
+        self._ensure_fd_no_transport(fd)
+        return self._add_writer(fd, callback, *args)
+
+    def remove_writer(self, fd):
+        """Remove a writer callback."""
+        self._ensure_fd_no_transport(fd)
+        return self._remove_writer(fd)
+
     def reset_counters(self):
         self.remove_reader_count = collections.defaultdict(int)
         self.remove_writer_count = collections.defaultdict(int)
@@ -415,7 +449,13 @@
         self.set_event_loop(loop)
         return loop
 
+    def setUp(self):
+        self._get_running_loop = events._get_running_loop
+        events._get_running_loop = lambda: None
+
     def tearDown(self):
+        events._get_running_loop = self._get_running_loop
+
         events.set_event_loop(None)
 
         # Detect CPython bug #23353: ensure that yield/yield-from is not used
diff --git a/lib-python/3/asyncio/transports.py b/lib-python/3/asyncio/transports.py
--- a/lib-python/3/asyncio/transports.py
+++ b/lib-python/3/asyncio/transports.py
@@ -33,6 +33,14 @@
         """
         raise NotImplementedError
 
+    def set_protocol(self, protocol):
+        """Set a new protocol."""
+        raise NotImplementedError
+
+    def get_protocol(self):
+        """Return the current protocol."""
+        raise NotImplementedError
+
 
 class ReadTransport(BaseTransport):
     """Interface for read-only transports."""
diff --git a/lib-python/3/asyncio/unix_events.py b/lib-python/3/asyncio/unix_events.py
--- a/lib-python/3/asyncio/unix_events.py
+++ b/lib-python/3/asyncio/unix_events.py
@@ -39,6 +39,13 @@
     pass
 
 
+try:
+    _fspath = os.fspath
+except AttributeError:
+    # Python 3.5 or earlier
+    _fspath = lambda path: path
+
+
 class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
     """Unix event loop.
 
@@ -234,6 +241,11 @@
         else:
             if sock is None:
                 raise ValueError('no path and sock were specified')
+            if (sock.family != socket.AF_UNIX or
+                    not base_events._is_stream_socket(sock)):
+                raise ValueError(
+                    'A UNIX Domain Stream Socket was expected, got {!r}'
+                    .format(sock))
             sock.setblocking(False)
 
         transport, protocol = yield from self._create_connection_transport(
@@ -251,8 +263,20 @@
                 raise ValueError(
                     'path and sock can not be specified at the same time')
 
+            path = _fspath(path)
             sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
 
+            # Check for abstract socket. `str` and `bytes` paths are supported.
+            if path[0] not in (0, '\x00'):
+                try:
+                    if stat.S_ISSOCK(os.stat(path).st_mode):
+                        os.remove(path)
+                except FileNotFoundError:
+                    pass
+                except OSError as err:
+                    # Directory may have permissions only to create socket.
+                    logger.error('Unable to check or remove stale UNIX socket %r: %r', path, err)
+
             try:
                 sock.bind(path)
             except OSError as exc:
@@ -272,9 +296,11 @@
                 raise ValueError(
                     'path was not specified, and no sock specified')
 
-            if sock.family != socket.AF_UNIX:
+            if (sock.family != socket.AF_UNIX or
+                    not base_events._is_stream_socket(sock)):
                 raise ValueError(
-                    'A UNIX Domain Socket was expected, got {!r}'.format(sock))
+                    'A UNIX Domain Stream Socket was expected, got {!r}'
+                    .format(sock))
 
         server = base_events.Server(self, [sock])
         sock.listen(backlog)
@@ -305,17 +331,23 @@
         self._loop = loop
         self._pipe = pipe
         self._fileno = pipe.fileno()
+        self._protocol = protocol
+        self._closing = False
+
         mode = os.fstat(self._fileno).st_mode
         if not (stat.S_ISFIFO(mode) or
                 stat.S_ISSOCK(mode) or
                 stat.S_ISCHR(mode)):
+            self._pipe = None
+            self._fileno = None
+            self._protocol = None
             raise ValueError("Pipe transport is for pipes/sockets only.")
+
         _set_nonblocking(self._fileno)
-        self._protocol = protocol
-        self._closing = False
+
         self._loop.call_soon(self._protocol.connection_made, self)
         # only start reading when connection_made() has been called
-        self._loop.call_soon(self._loop.add_reader,
+        self._loop.call_soon(self._loop._add_reader,
                              self._fileno, self._read_ready)
         if waiter is not None:
             # only wake up the waiter when connection_made() has been called


More information about the pypy-commit mailing list