[Python-checkins] cpython (merge 3.5 -> 3.6): Merge 3.5 (issue #28369)

yury.selivanov python-checkins at python.org
Wed Oct 5 17:50:24 EDT 2016


https://hg.python.org/cpython/rev/f3c1d8869dd5
changeset:   104303:f3c1d8869dd5
branch:      3.6
parent:      104300:fb6b60955d62
parent:      104302:2b502f624753
user:        Yury Selivanov <yury at magic.io>
date:        Wed Oct 05 17:49:54 2016 -0400
summary:
  Merge 3.5 (issue #28369)

files:
  Lib/asyncio/selector_events.py                |  127 ++++++---
  Lib/asyncio/test_utils.py                     |   42 ++-
  Lib/asyncio/unix_events.py                    |   26 +-
  Lib/test/test_asyncio/test_base_events.py     |   24 +-
  Lib/test/test_asyncio/test_selector_events.py |   64 ++--
  Misc/NEWS                                     |    3 +
  6 files changed, 177 insertions(+), 109 deletions(-)


diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -11,6 +11,7 @@
 import functools
 import socket
 import warnings
+import weakref
 try:
     import ssl
 except ImportError:  # pragma: no cover
@@ -64,6 +65,7 @@
         logger.debug('Using selector: %s', selector.__class__.__name__)
         self._selector = selector
         self._make_self_pipe()
+        self._transports = weakref.WeakValueDictionary()
 
     def _make_socket_transport(self, sock, protocol, waiter=None, *,
                                extra=None, server=None):
@@ -115,7 +117,7 @@
         raise NotImplementedError
 
     def _close_self_pipe(self):
-        self.remove_reader(self._ssock.fileno())
+        self._remove_reader(self._ssock.fileno())
         self._ssock.close()
         self._ssock = None
         self._csock.close()
@@ -128,7 +130,7 @@
         self._ssock.setblocking(False)
         self._csock.setblocking(False)
         self._internal_fds += 1
-        self.add_reader(self._ssock.fileno(), self._read_from_self)
+        self._add_reader(self._ssock.fileno(), self._read_from_self)
 
     def _process_self_data(self, data):
         pass
@@ -163,8 +165,8 @@
 
     def _start_serving(self, protocol_factory, sock,
                        sslcontext=None, server=None, backlog=100):
-        self.add_reader(sock.fileno(), self._accept_connection,
-                        protocol_factory, sock, sslcontext, server, backlog)
+        self._add_reader(sock.fileno(), self._accept_connection,
+                         protocol_factory, sock, sslcontext, server, backlog)
 
     def _accept_connection(self, protocol_factory, sock,
                            sslcontext=None, server=None, backlog=100):
@@ -194,7 +196,7 @@
                         'exception': exc,
                         'socket': sock,
                     })
-                    self.remove_reader(sock.fileno())
+                    self._remove_reader(sock.fileno())
                     self.call_later(constants.ACCEPT_RETRY_DELAY,
                                     self._start_serving,
                                     protocol_factory, sock, sslcontext, server,
@@ -244,8 +246,18 @@
                     context['transport'] = transport
                 self.call_exception_handler(context)
 
-    def add_reader(self, fd, callback, *args):
-        """Add a reader callback."""
+    def _ensure_fd_no_transport(self, fd):
+        try:
+            transport = self._transports[fd]
+        except KeyError:
+            pass
+        else:
+            if not transport.is_closing():
+                raise RuntimeError(
+                    'File descriptor {!r} is used by transport {!r}'.format(
+                        fd, transport))
+
+    def _add_reader(self, fd, callback, *args):
         self._check_closed()
         handle = events.Handle(callback, args, self)
         try:
@@ -260,8 +272,7 @@
             if reader is not None:
                 reader.cancel()
 
-    def remove_reader(self, fd):
-        """Remove a reader callback."""
+    def _remove_reader(self, fd):
         if self.is_closed():
             return False
         try:
@@ -282,8 +293,7 @@
             else:
                 return False
 
-    def add_writer(self, fd, callback, *args):
-        """Add a writer callback.."""
+    def _add_writer(self, fd, callback, *args):
         self._check_closed()
         handle = events.Handle(callback, args, self)
         try:
@@ -298,7 +308,7 @@
             if writer is not None:
                 writer.cancel()
 
-    def remove_writer(self, fd):
+    def _remove_writer(self, fd):
         """Remove a writer callback."""
         if self.is_closed():
             return False
@@ -321,6 +331,26 @@
             else:
                 return False
 
+    def add_reader(self, fd, callback, *args):
+        """Add a reader callback."""
+        self._ensure_fd_no_transport(fd)
+        return self._add_reader(fd, callback, *args)
+
+    def remove_reader(self, fd):
+        """Remove a reader callback."""
+        self._ensure_fd_no_transport(fd)
+        return self._remove_reader(fd)
+
+    def add_writer(self, fd, callback, *args):
+        """Add a writer callback.."""
+        self._ensure_fd_no_transport(fd)
+        return self._add_writer(fd, callback, *args)
+
+    def remove_writer(self, fd):
+        """Remove a writer callback."""
+        self._ensure_fd_no_transport(fd)
+        return self._remove_writer(fd)
+
     def sock_recv(self, sock, n):
         """Receive data from the socket.
 
@@ -494,17 +524,17 @@
             fileobj, (reader, writer) = key.fileobj, key.data
             if mask & selectors.EVENT_READ and reader is not None:
                 if reader._cancelled:
-                    self.remove_reader(fileobj)
+                    self._remove_reader(fileobj)
                 else:
                     self._add_callback(reader)
             if mask & selectors.EVENT_WRITE and writer is not None:
                 if writer._cancelled:
-                    self.remove_writer(fileobj)
+                    self._remove_writer(fileobj)
                 else:
                     self._add_callback(writer)
 
     def _stop_serving(self, sock):
-        self.remove_reader(sock.fileno())
+        self._remove_reader(sock.fileno())
         sock.close()
 
 
@@ -539,6 +569,7 @@
         self._closing = False  # Set when close() called.
         if self._server is not None:
             self._server._attach()
+        loop._transports[self._sock_fd] = self
 
     def __repr__(self):
         info = [self.__class__.__name__]
@@ -584,10 +615,10 @@
         if self._closing:
             return
         self._closing = True
-        self._loop.remove_reader(self._sock_fd)
+        self._loop._remove_reader(self._sock_fd)
         if not self._buffer:
             self._conn_lost += 1
-            self._loop.remove_writer(self._sock_fd)
+            self._loop._remove_writer(self._sock_fd)
             self._loop.call_soon(self._call_connection_lost, None)
 
     # On Python 3.3 and older, objects with a destructor part of a reference
@@ -619,10 +650,10 @@
             return
         if self._buffer:
             self._buffer.clear()
-            self._loop.remove_writer(self._sock_fd)
+            self._loop._remove_writer(self._sock_fd)
         if not self._closing:
             self._closing = True
-            self._loop.remove_reader(self._sock_fd)
+            self._loop._remove_reader(self._sock_fd)
         self._conn_lost += 1
         self._loop.call_soon(self._call_connection_lost, exc)
 
@@ -659,7 +690,7 @@
 
         self._loop.call_soon(self._protocol.connection_made, self)
         # only start reading when connection_made() has been called
-        self._loop.call_soon(self._loop.add_reader,
+        self._loop.call_soon(self._loop._add_reader,
                              self._sock_fd, self._read_ready)
         if waiter is not None:
             # only wake up the waiter when connection_made() has been called
@@ -672,7 +703,7 @@
         if self._paused:
             raise RuntimeError('Already paused')
         self._paused = True
-        self._loop.remove_reader(self._sock_fd)
+        self._loop._remove_reader(self._sock_fd)
         if self._loop.get_debug():
             logger.debug("%r pauses reading", self)
 
@@ -682,7 +713,7 @@
         self._paused = False
         if self._closing:
             return
-        self._loop.add_reader(self._sock_fd, self._read_ready)
+        self._loop._add_reader(self._sock_fd, self._read_ready)
         if self._loop.get_debug():
             logger.debug("%r resumes reading", self)
 
@@ -706,7 +737,7 @@
                     # We're keeping the connection open so the
                     # protocol can write more, but we still can't
                     # receive more, so remove the reader callback.
-                    self._loop.remove_reader(self._sock_fd)
+                    self._loop._remove_reader(self._sock_fd)
                 else:
                     self.close()
 
@@ -739,7 +770,7 @@
                 if not data:
                     return
             # Not all was written; register write handler.
-            self._loop.add_writer(self._sock_fd, self._write_ready)
+            self._loop._add_writer(self._sock_fd, self._write_ready)
 
         # Add it to the buffer.
         self._buffer.extend(data)
@@ -755,7 +786,7 @@
         except (BlockingIOError, InterruptedError):
             pass
         except Exception as exc:
-            self._loop.remove_writer(self._sock_fd)
+            self._loop._remove_writer(self._sock_fd)
             self._buffer.clear()
             self._fatal_error(exc, 'Fatal write error on socket transport')
         else:
@@ -763,7 +794,7 @@
                 del self._buffer[:n]
             self._maybe_resume_protocol()  # May append to buffer.
             if not self._buffer:
-                self._loop.remove_writer(self._sock_fd)
+                self._loop._remove_writer(self._sock_fd)
                 if self._closing:
                     self._call_connection_lost(None)
                 elif self._eof:
@@ -834,19 +865,19 @@
         try:
             self._sock.do_handshake()
         except ssl.SSLWantReadError:
-            self._loop.add_reader(self._sock_fd,
-                                  self._on_handshake, start_time)
+            self._loop._add_reader(self._sock_fd,
+                                   self._on_handshake, start_time)
             return
         except ssl.SSLWantWriteError:
-            self._loop.add_writer(self._sock_fd,
-                                  self._on_handshake, start_time)
+            self._loop._add_writer(self._sock_fd,
+                                   self._on_handshake, start_time)
             return
         except BaseException as exc:
             if self._loop.get_debug():
                 logger.warning("%r: SSL handshake failed",
                                self, exc_info=True)
-            self._loop.remove_reader(self._sock_fd)
-            self._loop.remove_writer(self._sock_fd)
+            self._loop._remove_reader(self._sock_fd)
+            self._loop._remove_writer(self._sock_fd)
             self._sock.close()
             self._wakeup_waiter(exc)
             if isinstance(exc, Exception):
@@ -854,8 +885,8 @@
             else:
                 raise
 
-        self._loop.remove_reader(self._sock_fd)
-        self._loop.remove_writer(self._sock_fd)
+        self._loop._remove_reader(self._sock_fd)
+        self._loop._remove_writer(self._sock_fd)
 
         peercert = self._sock.getpeercert()
         if not hasattr(self._sslcontext, 'check_hostname'):
@@ -883,7 +914,7 @@
 
         self._read_wants_write = False
         self._write_wants_read = False
-        self._loop.add_reader(self._sock_fd, self._read_ready)
+        self._loop._add_reader(self._sock_fd, self._read_ready)
         self._protocol_connected = True
         self._loop.call_soon(self._protocol.connection_made, self)
         # only wake up the waiter when connection_made() has been called
@@ -905,7 +936,7 @@
         if self._paused:
             raise RuntimeError('Already paused')
         self._paused = True
-        self._loop.remove_reader(self._sock_fd)
+        self._loop._remove_reader(self._sock_fd)
         if self._loop.get_debug():
             logger.debug("%r pauses reading", self)
 
@@ -915,7 +946,7 @@
         self._paused = False
         if self._closing:
             return
-        self._loop.add_reader(self._sock_fd, self._read_ready)
+        self._loop._add_reader(self._sock_fd, self._read_ready)
         if self._loop.get_debug():
             logger.debug("%r resumes reading", self)
 
@@ -927,7 +958,7 @@
             self._write_ready()
 
             if self._buffer:
-                self._loop.add_writer(self._sock_fd, self._write_ready)
+                self._loop._add_writer(self._sock_fd, self._write_ready)
 
         try:
             data = self._sock.recv(self.max_size)
@@ -935,8 +966,8 @@
             pass
         except ssl.SSLWantWriteError:
             self._read_wants_write = True
-            self._loop.remove_reader(self._sock_fd)
-            self._loop.add_writer(self._sock_fd, self._write_ready)
+            self._loop._remove_reader(self._sock_fd)
+            self._loop._add_writer(self._sock_fd, self._write_ready)
         except Exception as exc:
             self._fatal_error(exc, 'Fatal read error on SSL transport')
         else:
@@ -961,7 +992,7 @@
             self._read_ready()
 
             if not (self._paused or self._closing):
-                self._loop.add_reader(self._sock_fd, self._read_ready)
+                self._loop._add_reader(self._sock_fd, self._read_ready)
 
         if self._buffer:
             try:
@@ -970,10 +1001,10 @@
                 n = 0
             except ssl.SSLWantReadError:
                 n = 0
-                self._loop.remove_writer(self._sock_fd)
+                self._loop._remove_writer(self._sock_fd)
                 self._write_wants_read = True
             except Exception as exc:
-                self._loop.remove_writer(self._sock_fd)
+                self._loop._remove_writer(self._sock_fd)
                 self._buffer.clear()
                 self._fatal_error(exc, 'Fatal write error on SSL transport')
                 return
@@ -984,7 +1015,7 @@
         self._maybe_resume_protocol()  # May append to buffer.
 
         if not self._buffer:
-            self._loop.remove_writer(self._sock_fd)
+            self._loop._remove_writer(self._sock_fd)
             if self._closing:
                 self._call_connection_lost(None)
 
@@ -1002,7 +1033,7 @@
             return
 
         if not self._buffer:
-            self._loop.add_writer(self._sock_fd, self._write_ready)
+            self._loop._add_writer(self._sock_fd, self._write_ready)
 
         # Add it to the buffer.
         self._buffer.extend(data)
@@ -1022,7 +1053,7 @@
         self._address = address
         self._loop.call_soon(self._protocol.connection_made, self)
         # only start reading when connection_made() has been called
-        self._loop.call_soon(self._loop.add_reader,
+        self._loop.call_soon(self._loop._add_reader,
                              self._sock_fd, self._read_ready)
         if waiter is not None:
             # only wake up the waiter when connection_made() has been called
@@ -1072,7 +1103,7 @@
                     self._sock.sendto(data, addr)
                 return
             except (BlockingIOError, InterruptedError):
-                self._loop.add_writer(self._sock_fd, self._sendto_ready)
+                self._loop._add_writer(self._sock_fd, self._sendto_ready)
             except OSError as exc:
                 self._protocol.error_received(exc)
                 return
@@ -1106,6 +1137,6 @@
 
         self._maybe_resume_protocol()  # May append to buffer.
         if not self._buffer:
-            self._loop.remove_writer(self._sock_fd)
+            self._loop._remove_writer(self._sock_fd)
             if self._closing:
                 self._call_connection_lost(None)
diff --git a/Lib/asyncio/test_utils.py b/Lib/asyncio/test_utils.py
--- a/Lib/asyncio/test_utils.py
+++ b/Lib/asyncio/test_utils.py
@@ -13,6 +13,8 @@
 import threading
 import time
 import unittest
+import weakref
+
 from unittest import mock
 
 from http.server import HTTPServer
@@ -300,6 +302,8 @@
         self.writers = {}
         self.reset_counters()
 
+        self._transports = weakref.WeakValueDictionary()
+
     def time(self):
         return self._time
 
@@ -318,10 +322,10 @@
             else:  # pragma: no cover
                 raise AssertionError("Time generator is not finished")
 
-    def add_reader(self, fd, callback, *args):
+    def _add_reader(self, fd, callback, *args):
         self.readers[fd] = events.Handle(callback, args, self)
 
-    def remove_reader(self, fd):
+    def _remove_reader(self, fd):
         self.remove_reader_count[fd] += 1
         if fd in self.readers:
             del self.readers[fd]
@@ -337,10 +341,10 @@
         assert handle._args == args, '{!r} != {!r}'.format(
             handle._args, args)
 
-    def add_writer(self, fd, callback, *args):
+    def _add_writer(self, fd, callback, *args):
         self.writers[fd] = events.Handle(callback, args, self)
 
-    def remove_writer(self, fd):
+    def _remove_writer(self, fd):
         self.remove_writer_count[fd] += 1
         if fd in self.writers:
             del self.writers[fd]
@@ -356,6 +360,36 @@
         assert handle._args == args, '{!r} != {!r}'.format(
             handle._args, args)
 
+    def _ensure_fd_no_transport(self, fd):
+        try:
+            transport = self._transports[fd]
+        except KeyError:
+            pass
+        else:
+            raise RuntimeError(
+                'File descriptor {!r} is used by transport {!r}'.format(
+                    fd, transport))
+
+    def add_reader(self, fd, callback, *args):
+        """Add a reader callback."""
+        self._ensure_fd_no_transport(fd)
+        return self._add_reader(fd, callback, *args)
+
+    def remove_reader(self, fd):
+        """Remove a reader callback."""
+        self._ensure_fd_no_transport(fd)
+        return self._remove_reader(fd)
+
+    def add_writer(self, fd, callback, *args):
+        """Add a writer callback.."""
+        self._ensure_fd_no_transport(fd)
+        return self._add_writer(fd, callback, *args)
+
+    def remove_writer(self, fd):
+        """Remove a writer callback."""
+        self._ensure_fd_no_transport(fd)
+        return self._remove_writer(fd)
+
     def reset_counters(self):
         self.remove_reader_count = collections.defaultdict(int)
         self.remove_writer_count = collections.defaultdict(int)
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -321,7 +321,7 @@
 
         self._loop.call_soon(self._protocol.connection_made, self)
         # only start reading when connection_made() has been called
-        self._loop.call_soon(self._loop.add_reader,
+        self._loop.call_soon(self._loop._add_reader,
                              self._fileno, self._read_ready)
         if waiter is not None:
             # only wake up the waiter when connection_made() has been called
@@ -364,15 +364,15 @@
                 if self._loop.get_debug():
                     logger.info("%r was closed by peer", self)
                 self._closing = True
-                self._loop.remove_reader(self._fileno)
+                self._loop._remove_reader(self._fileno)
                 self._loop.call_soon(self._protocol.eof_received)
                 self._loop.call_soon(self._call_connection_lost, None)
 
     def pause_reading(self):
-        self._loop.remove_reader(self._fileno)
+        self._loop._remove_reader(self._fileno)
 
     def resume_reading(self):
-        self._loop.add_reader(self._fileno, self._read_ready)
+        self._loop._add_reader(self._fileno, self._read_ready)
 
     def set_protocol(self, protocol):
         self._protocol = protocol
@@ -413,7 +413,7 @@
 
     def _close(self, exc):
         self._closing = True
-        self._loop.remove_reader(self._fileno)
+        self._loop._remove_reader(self._fileno)
         self._loop.call_soon(self._call_connection_lost, exc)
 
     def _call_connection_lost(self, exc):
@@ -458,7 +458,7 @@
         # works for pipes and sockets. (Exception: OS X 10.4?  Issue #19294.)
         if is_socket or (is_fifo and not sys.platform.startswith("aix")):
             # only start reading when connection_made() has been called
-            self._loop.call_soon(self._loop.add_reader,
+            self._loop.call_soon(self._loop._add_reader,
                                  self._fileno, self._read_ready)
 
         if waiter is not None:
@@ -531,7 +531,7 @@
                 return
             elif n > 0:
                 data = memoryview(data)[n:]
-            self._loop.add_writer(self._fileno, self._write_ready)
+            self._loop._add_writer(self._fileno, self._write_ready)
 
         self._buffer += data
         self._maybe_pause_protocol()
@@ -548,15 +548,15 @@
             self._conn_lost += 1
             # Remove writer here, _fatal_error() doesn't it
             # because _buffer is empty.
-            self._loop.remove_writer(self._fileno)
+            self._loop._remove_writer(self._fileno)
             self._fatal_error(exc, 'Fatal write error on pipe transport')
         else:
             if n == len(self._buffer):
                 self._buffer.clear()
-                self._loop.remove_writer(self._fileno)
+                self._loop._remove_writer(self._fileno)
                 self._maybe_resume_protocol()  # May append to buffer.
                 if self._closing:
-                    self._loop.remove_reader(self._fileno)
+                    self._loop._remove_reader(self._fileno)
                     self._call_connection_lost(None)
                 return
             elif n > 0:
@@ -571,7 +571,7 @@
         assert self._pipe
         self._closing = True
         if not self._buffer:
-            self._loop.remove_reader(self._fileno)
+            self._loop._remove_reader(self._fileno)
             self._loop.call_soon(self._call_connection_lost, None)
 
     def set_protocol(self, protocol):
@@ -618,9 +618,9 @@
     def _close(self, exc=None):
         self._closing = True
         if self._buffer:
-            self._loop.remove_writer(self._fileno)
+            self._loop._remove_writer(self._fileno)
         self._buffer.clear()
-        self._loop.remove_reader(self._fileno)
+        self._loop._remove_reader(self._fileno)
         self._loop.call_soon(self._call_connection_lost, exc)
 
     def _call_connection_lost(self, exc):
diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py
--- a/Lib/test/test_asyncio/test_base_events.py
+++ b/Lib/test/test_asyncio/test_base_events.py
@@ -1148,10 +1148,10 @@
         m_socket.getaddrinfo = socket.getaddrinfo
         sock = m_socket.socket.return_value
 
-        self.loop.add_reader = mock.Mock()
-        self.loop.add_reader._is_coroutine = False
-        self.loop.add_writer = mock.Mock()
-        self.loop.add_writer._is_coroutine = False
+        self.loop._add_reader = mock.Mock()
+        self.loop._add_reader._is_coroutine = False
+        self.loop._add_writer = mock.Mock()
+        self.loop._add_writer._is_coroutine = False
 
         coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80)
         t, p = self.loop.run_until_complete(coro)
@@ -1194,10 +1194,10 @@
         m_socket.getaddrinfo = socket.getaddrinfo
         sock = m_socket.socket.return_value
 
-        self.loop.add_reader = mock.Mock()
-        self.loop.add_reader._is_coroutine = False
-        self.loop.add_writer = mock.Mock()
-        self.loop.add_writer._is_coroutine = False
+        self.loop._add_reader = mock.Mock()
+        self.loop._add_reader._is_coroutine = False
+        self.loop._add_writer = mock.Mock()
+        self.loop._add_writer._is_coroutine = False
 
         for service, port in ('http', 80), (b'http', 80):
             coro = self.loop.create_connection(asyncio.Protocol,
@@ -1614,8 +1614,8 @@
 
         m_socket.getaddrinfo = getaddrinfo
         m_socket.socket.return_value.bind = bind = mock.Mock()
-        self.loop.add_reader = mock.Mock()
-        self.loop.add_reader._is_coroutine = False
+        self.loop._add_reader = mock.Mock()
+        self.loop._add_reader._is_coroutine = False
 
         reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
         coro = self.loop.create_datagram_endpoint(
@@ -1646,13 +1646,13 @@
         sock = mock.Mock()
         sock.fileno.return_value = 10
         sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files')
-        self.loop.remove_reader = mock.Mock()
+        self.loop._remove_reader = mock.Mock()
         self.loop.call_later = mock.Mock()
 
         self.loop._accept_connection(MyProto, sock)
         self.assertTrue(m_log.error.called)
         self.assertFalse(sock.close.called)
-        self.loop.remove_reader.assert_called_with(10)
+        self.loop._remove_reader.assert_called_with(10)
         self.loop.call_later.assert_called_with(constants.ACCEPT_RETRY_DELAY,
                                                 # self.loop._start_serving
                                                 mock.ANY,
diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py
--- a/Lib/test/test_asyncio/test_selector_events.py
+++ b/Lib/test/test_asyncio/test_selector_events.py
@@ -72,11 +72,11 @@
     @unittest.skipIf(ssl is None, 'No ssl module')
     def test_make_ssl_transport(self):
         m = mock.Mock()
-        self.loop.add_reader = mock.Mock()
-        self.loop.add_reader._is_coroutine = False
-        self.loop.add_writer = mock.Mock()
-        self.loop.remove_reader = mock.Mock()
-        self.loop.remove_writer = mock.Mock()
+        self.loop._add_reader = mock.Mock()
+        self.loop._add_reader._is_coroutine = False
+        self.loop._add_writer = mock.Mock()
+        self.loop._remove_reader = mock.Mock()
+        self.loop._remove_writer = mock.Mock()
         waiter = asyncio.Future(loop=self.loop)
         with test_utils.disable_logger():
             transport = self.loop._make_ssl_transport(
@@ -119,7 +119,7 @@
         ssock.fileno.return_value = 7
         csock = self.loop._csock
         csock.fileno.return_value = 1
-        remove_reader = self.loop.remove_reader = mock.Mock()
+        remove_reader = self.loop._remove_reader = mock.Mock()
 
         self.loop._selector.close()
         self.loop._selector = selector = mock.Mock()
@@ -651,12 +651,12 @@
         reader = mock.Mock()
         reader.cancelled = True
 
-        self.loop.remove_reader = mock.Mock()
+        self.loop._remove_reader = mock.Mock()
         self.loop._process_events(
             [(selectors.SelectorKey(
                 1, 1, selectors.EVENT_READ, (reader, None)),
              selectors.EVENT_READ)])
-        self.loop.remove_reader.assert_called_with(1)
+        self.loop._remove_reader.assert_called_with(1)
 
     def test_process_events_write(self):
         writer = mock.Mock()
@@ -672,13 +672,13 @@
     def test_process_events_write_cancelled(self):
         writer = mock.Mock()
         writer.cancelled = True
-        self.loop.remove_writer = mock.Mock()
+        self.loop._remove_writer = mock.Mock()
 
         self.loop._process_events(
             [(selectors.SelectorKey(1, 1, selectors.EVENT_WRITE,
                                     (None, writer)),
               selectors.EVENT_WRITE)])
-        self.loop.remove_writer.assert_called_with(1)
+        self.loop._remove_writer.assert_called_with(1)
 
     def test_accept_connection_multiple(self):
         sock = mock.Mock()
@@ -747,8 +747,8 @@
     def test_force_close(self):
         tr = self.create_transport()
         tr._buffer.extend(b'1')
-        self.loop.add_reader(7, mock.sentinel)
-        self.loop.add_writer(7, mock.sentinel)
+        self.loop._add_reader(7, mock.sentinel)
+        self.loop._add_writer(7, mock.sentinel)
         tr._force_close(None)
 
         self.assertTrue(tr.is_closing())
@@ -1037,7 +1037,7 @@
 
         transport = self.socket_transport()
         transport._buffer.extend(data)
-        self.loop.add_writer(7, transport._write_ready)
+        self.loop._add_writer(7, transport._write_ready)
         transport._write_ready()
         self.assertTrue(self.sock.send.called)
         self.assertFalse(self.loop.writers)
@@ -1049,7 +1049,7 @@
         transport = self.socket_transport()
         transport._closing = True
         transport._buffer.extend(data)
-        self.loop.add_writer(7, transport._write_ready)
+        self.loop._add_writer(7, transport._write_ready)
         transport._write_ready()
         self.assertTrue(self.sock.send.called)
         self.assertFalse(self.loop.writers)
@@ -1067,7 +1067,7 @@
 
         transport = self.socket_transport()
         transport._buffer.extend(data)
-        self.loop.add_writer(7, transport._write_ready)
+        self.loop._add_writer(7, transport._write_ready)
         transport._write_ready()
         self.loop.assert_writer(7, transport._write_ready)
         self.assertEqual(list_to_buffer([b'ta']), transport._buffer)
@@ -1078,7 +1078,7 @@
 
         transport = self.socket_transport()
         transport._buffer.extend(data)
-        self.loop.add_writer(7, transport._write_ready)
+        self.loop._add_writer(7, transport._write_ready)
         transport._write_ready()
         self.loop.assert_writer(7, transport._write_ready)
         self.assertEqual(list_to_buffer([b'data']), transport._buffer)
@@ -1088,7 +1088,7 @@
 
         transport = self.socket_transport()
         transport._buffer = list_to_buffer([b'data1', b'data2'])
-        self.loop.add_writer(7, transport._write_ready)
+        self.loop._add_writer(7, transport._write_ready)
         transport._write_ready()
 
         self.loop.assert_writer(7, transport._write_ready)
@@ -1130,7 +1130,7 @@
 
     @mock.patch('asyncio.base_events.logger')
     def test_transport_close_remove_writer(self, m_log):
-        remove_writer = self.loop.remove_writer = mock.Mock()
+        remove_writer = self.loop._remove_writer = mock.Mock()
 
         transport = self.socket_transport()
         transport.close()
@@ -1288,7 +1288,7 @@
         self.assertEqual((b'data',), self.protocol.data_received.call_args[0])
 
     def test_read_ready_write_wants_read(self):
-        self.loop.add_writer = mock.Mock()
+        self.loop._add_writer = mock.Mock()
         self.sslsock.recv.side_effect = BlockingIOError
         transport = self._make_one()
         transport._write_wants_read = True
@@ -1298,7 +1298,7 @@
 
         self.assertFalse(transport._write_wants_read)
         transport._write_ready.assert_called_with()
-        self.loop.add_writer.assert_called_with(
+        self.loop._add_writer.assert_called_with(
             transport._sock_fd, transport._write_ready)
 
     def test_read_ready_recv_eof(self):
@@ -1333,16 +1333,16 @@
         self.assertFalse(self.protocol.data_received.called)
 
     def test_read_ready_recv_write(self):
-        self.loop.remove_reader = mock.Mock()
-        self.loop.add_writer = mock.Mock()
+        self.loop._remove_reader = mock.Mock()
+        self.loop._add_writer = mock.Mock()
         self.sslsock.recv.side_effect = ssl.SSLWantWriteError
         transport = self._make_one()
         transport._read_ready()
         self.assertFalse(self.protocol.data_received.called)
         self.assertTrue(transport._read_wants_write)
 
-        self.loop.remove_reader.assert_called_with(transport._sock_fd)
-        self.loop.add_writer.assert_called_with(
+        self.loop._remove_reader.assert_called_with(transport._sock_fd)
+        self.loop._add_writer.assert_called_with(
             transport._sock_fd, transport._write_ready)
 
     def test_read_ready_recv_exc(self):
@@ -1419,12 +1419,12 @@
         transport = self._make_one()
         transport._buffer = list_to_buffer([b'data'])
 
-        self.loop.remove_writer = mock.Mock()
+        self.loop._remove_writer = mock.Mock()
         self.sslsock.send.side_effect = ssl.SSLWantReadError
         transport._write_ready()
         self.assertFalse(self.protocol.data_received.called)
         self.assertTrue(transport._write_wants_read)
-        self.loop.remove_writer.assert_called_with(transport._sock_fd)
+        self.loop._remove_writer.assert_called_with(transport._sock_fd)
 
     def test_write_ready_send_exc(self):
         err = self.sslsock.send.side_effect = OSError()
@@ -1439,7 +1439,7 @@
         self.assertEqual(list_to_buffer(), transport._buffer)
 
     def test_write_ready_read_wants_write(self):
-        self.loop.add_reader = mock.Mock()
+        self.loop._add_reader = mock.Mock()
         self.sslsock.send.side_effect = BlockingIOError
         transport = self._make_one()
         transport._read_wants_write = True
@@ -1448,7 +1448,7 @@
 
         self.assertFalse(transport._read_wants_write)
         transport._read_ready.assert_called_with()
-        self.loop.add_reader.assert_called_with(
+        self.loop._add_reader.assert_called_with(
             transport._sock_fd, transport._read_ready)
 
     def test_write_eof(self):
@@ -1699,7 +1699,7 @@
 
         transport = self.datagram_transport()
         transport._buffer.append((data, ('0.0.0.0', 12345)))
-        self.loop.add_writer(7, transport._sendto_ready)
+        self.loop._add_writer(7, transport._sendto_ready)
         transport._sendto_ready()
         self.assertTrue(self.sock.sendto.called)
         self.assertEqual(
@@ -1713,7 +1713,7 @@
         transport = self.datagram_transport()
         transport._closing = True
         transport._buffer.append((data, ()))
-        self.loop.add_writer(7, transport._sendto_ready)
+        self.loop._add_writer(7, transport._sendto_ready)
         transport._sendto_ready()
         self.sock.sendto.assert_called_with(data, ())
         self.assertFalse(self.loop.writers)
@@ -1722,7 +1722,7 @@
 
     def test_sendto_ready_no_data(self):
         transport = self.datagram_transport()
-        self.loop.add_writer(7, transport._sendto_ready)
+        self.loop._add_writer(7, transport._sendto_ready)
         transport._sendto_ready()
         self.assertFalse(self.sock.sendto.called)
         self.assertFalse(self.loop.writers)
@@ -1732,7 +1732,7 @@
 
         transport = self.datagram_transport()
         transport._buffer.extend([(b'data1', ()), (b'data2', ())])
-        self.loop.add_writer(7, transport._sendto_ready)
+        self.loop._add_writer(7, transport._sendto_ready)
         transport._sendto_ready()
 
         self.loop.assert_writer(7, transport._sendto_ready)
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -167,6 +167,9 @@
   loop attached.
   Patch by Vincent Michel.
 
+- Issue #28369: Raise RuntimeError when transport's FD is used with
+  add_reader, add_writer, etc.
+
 Windows
 -------
 

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list