[Python-checkins] bpo-32662: Implement Server.start_serving() and Server.serve_forever() (#5312)

Yury Selivanov webhook-mailer at python.org
Thu Jan 25 18:08:12 EST 2018


https://github.com/python/cpython/commit/c9070d03f5169ad6e171e641b7fa8feab18bf229
commit: c9070d03f5169ad6e171e641b7fa8feab18bf229
branch: master
author: Yury Selivanov <yury at magic.io>
committer: GitHub <noreply at github.com>
date: 2018-01-25T18:08:09-05:00
summary:

bpo-32662: Implement Server.start_serving() and Server.serve_forever() (#5312)

* bpo-32662: Implement Server.start_serving() and Server.serve_forever()

New methods:

* Server.start_serving(),
* Server.serve_forever(), and
* Server.is_serving().

Add 'start_serving' keyword parameter to loop.create_server() and
loop.create_unix_server().

files:
A Lib/test/test_asyncio/test_server.py
A Misc/NEWS.d/next/Library/2018-01-25-01-45-30.bpo-32662.oabhd8.rst
M Doc/library/asyncio-eventloop.rst
M Lib/asyncio/base_events.py
M Lib/asyncio/events.py
M Lib/asyncio/unix_events.py

diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst
index 6cee171b65a..834a4e85c2f 100644
--- a/Doc/library/asyncio-eventloop.rst
+++ b/Doc/library/asyncio-eventloop.rst
@@ -424,7 +424,7 @@ Creating connections
 Creating listening connections
 ------------------------------
 
-.. coroutinemethod:: AbstractEventLoop.create_server(protocol_factory, host=None, port=None, \*, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None)
+.. coroutinemethod:: AbstractEventLoop.create_server(protocol_factory, host=None, port=None, \*, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)
 
    Create a TCP server (socket type :data:`~socket.SOCK_STREAM`) bound to
    *host* and *port*.
@@ -472,9 +472,15 @@ Creating listening connections
      for the SSL handshake to complete before aborting the connection.
      ``10.0`` seconds if ``None`` (default).
 
+   * *start_serving* set to ``True`` (the default) causes the created server
+     to start accepting connections immediately.  When set to ``False``,
+     the user should await on :meth:`Server.start_serving` or
+     :meth:`Server.serve_forever` to make the server to start accepting
+     connections.
+
    .. versionadded:: 3.7
 
-      The *ssl_handshake_timeout* parameter.
+      *ssl_handshake_timeout* and *start_serving* parameters.
 
    .. versionchanged:: 3.5
 
@@ -490,7 +496,7 @@ Creating listening connections
       The *host* parameter can now be a sequence of strings.
 
 
-.. coroutinemethod:: AbstractEventLoop.create_unix_server(protocol_factory, path=None, \*, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None)
+.. coroutinemethod:: AbstractEventLoop.create_unix_server(protocol_factory, path=None, \*, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)
 
    Similar to :meth:`AbstractEventLoop.create_server`, but specific to the
    socket family :py:data:`~socket.AF_UNIX`.
@@ -929,8 +935,26 @@ Server
 
    Server listening on sockets.
 
-   Object created by the :meth:`AbstractEventLoop.create_server` method and the
-   :func:`start_server` function. Don't instantiate the class directly.
+   Object created by :meth:`AbstractEventLoop.create_server`,
+   :meth:`AbstractEventLoop.create_unix_server`, :func:`start_server`,
+   and :func:`start_unix_server` functions.  Don't instantiate the class
+   directly.
+
+   *Server* objects are asynchronous context managers.  When used in an
+   ``async with`` statement, it's guaranteed that the Server object is
+   closed and not accepting new connections when the ``async with``
+   statement is completed::
+
+      srv = await loop.create_server(...)
+
+      async with srv:
+          # some code
+
+      # At this point, srv is closed and no longer accepts new connections.
+
+
+   .. versionchanged:: 3.7
+      Server object is an asynchronous context manager since Python 3.7.
 
    .. method:: close()
 
@@ -949,6 +973,54 @@ Server
 
       .. versionadded:: 3.7
 
+   .. coroutinemethod:: start_serving()
+
+      Start accepting connections.
+
+      This method is idempotent, so it can be called when
+      the server is already being serving.
+
+      The new *start_serving* keyword-only parameter to
+      :meth:`AbstractEventLoop.create_server` and
+      :meth:`asyncio.start_server` allows to create a Server object
+      that is not accepting connections right away.  In which case
+      this method, or :meth:`Server.serve_forever` can be used
+      to make the Server object to start accepting connections.
+
+      .. versionadded:: 3.7
+
+   .. coroutinemethod:: serve_forever()
+
+      Start accepting connections until the coroutine is cancelled.
+      Cancellation of ``serve_forever`` task causes the server
+      to be closed.
+
+      This method can be called if the server is already accepting
+      connections.  Only one ``serve_forever`` task can exist per
+      one *Server* object.
+
+      Example::
+
+          async def client_connected(reader, writer):
+              # Communicate with the client with
+              # reader/writer streams.  For example:
+              await reader.readline()
+
+          async def main(host, port):
+              srv = await asyncio.start_server(
+                  client_connected, host, port)
+              await loop.serve_forever()
+
+          asyncio.run(main('127.0.0.1', 0))
+
+      .. versionadded:: 3.7
+
+   .. method:: is_serving()
+
+      Return ``True`` if the server is accepting new connections.
+
+      .. versionadded:: 3.7
+
    .. coroutinemethod:: wait_closed()
 
       Wait until the :meth:`close` method completes.
@@ -958,6 +1030,11 @@ Server
       List of :class:`socket.socket` objects the server is listening to, or
       ``None`` if the server is closed.
 
+      .. versionchanged:: 3.7
+         Prior to Python 3.7 ``Server.sockets`` used to return the
+         internal list of server's sockets directly.  In 3.7 a copy
+         of that list is returned.
+
 
 Handle
 ------
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index e722cf26b51..94eb3089e93 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -157,47 +157,106 @@ def _run_until_complete_cb(fut):
 
 class Server(events.AbstractServer):
 
-    def __init__(self, loop, sockets):
+    def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
+                 ssl_handshake_timeout):
         self._loop = loop
-        self.sockets = sockets
+        self._sockets = sockets
         self._active_count = 0
         self._waiters = []
+        self._protocol_factory = protocol_factory
+        self._backlog = backlog
+        self._ssl_context = ssl_context
+        self._ssl_handshake_timeout = ssl_handshake_timeout
+        self._serving = False
+        self._serving_forever_fut = None
 
     def __repr__(self):
         return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
 
     def _attach(self):
-        assert self.sockets is not None
+        assert self._sockets is not None
         self._active_count += 1
 
     def _detach(self):
         assert self._active_count > 0
         self._active_count -= 1
-        if self._active_count == 0 and self.sockets is None:
+        if self._active_count == 0 and self._sockets is None:
             self._wakeup()
 
+    def _wakeup(self):
+        waiters = self._waiters
+        self._waiters = None
+        for waiter in waiters:
+            if not waiter.done():
+                waiter.set_result(waiter)
+
+    def _start_serving(self):
+        if self._serving:
+            return
+        self._serving = True
+        for sock in self._sockets:
+            sock.listen(self._backlog)
+            self._loop._start_serving(
+                self._protocol_factory, sock, self._ssl_context,
+                self, self._backlog, self._ssl_handshake_timeout)
+
+    def get_loop(self):
+        return self._loop
+
+    def is_serving(self):
+        return self._serving
+
+    @property
+    def sockets(self):
+        if self._sockets is None:
+            return []
+        return list(self._sockets)
+
     def close(self):
-        sockets = self.sockets
+        sockets = self._sockets
         if sockets is None:
             return
-        self.sockets = None
+        self._sockets = None
+
         for sock in sockets:
             self._loop._stop_serving(sock)
+
+        self._serving = False
+
+        if (self._serving_forever_fut is not None and
+                not self._serving_forever_fut.done()):
+            self._serving_forever_fut.cancel()
+            self._serving_forever_fut = None
+
         if self._active_count == 0:
             self._wakeup()
 
-    def get_loop(self):
-        return self._loop
+    async def start_serving(self):
+        self._start_serving()
 
-    def _wakeup(self):
-        waiters = self._waiters
-        self._waiters = None
-        for waiter in waiters:
-            if not waiter.done():
-                waiter.set_result(waiter)
+    async def serve_forever(self):
+        if self._serving_forever_fut is not None:
+            raise RuntimeError(
+                f'server {self!r} is already being awaited on serve_forever()')
+        if self._sockets is None:
+            raise RuntimeError(f'server {self!r} is closed')
+
+        self._start_serving()
+        self._serving_forever_fut = self._loop.create_future()
+
+        try:
+            await self._serving_forever_fut
+        except futures.CancelledError:
+            try:
+                self.close()
+                await self.wait_closed()
+            finally:
+                raise
+        finally:
+            self._serving_forever_fut = None
 
     async def wait_closed(self):
-        if self.sockets is None or self._waiters is None:
+        if self._sockets is None or self._waiters is None:
             return
         waiter = self._loop.create_future()
         self._waiters.append(waiter)
@@ -1059,7 +1118,8 @@ def _check_sendfile_params(self, sock, file, offset, count):
             ssl=None,
             reuse_address=None,
             reuse_port=None,
-            ssl_handshake_timeout=None):
+            ssl_handshake_timeout=None,
+            start_serving=True):
         """Create a TCP server.
 
         The host parameter can be a string, in that case the TCP server is
@@ -1149,12 +1209,14 @@ def _check_sendfile_params(self, sock, file, offset, count):
                 raise ValueError(f'A Stream Socket was expected, got {sock!r}')
             sockets = [sock]
 
-        server = Server(self, sockets)
         for sock in sockets:
-            sock.listen(backlog)
             sock.setblocking(False)
-            self._start_serving(protocol_factory, sock, ssl, server, backlog,
-                                ssl_handshake_timeout)
+
+        server = Server(self, sockets, protocol_factory,
+                        ssl, backlog, ssl_handshake_timeout)
+        if start_serving:
+            server._start_serving()
+
         if self._debug:
             logger.info("%r is serving", server)
         return server
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
index 5c68d4cb97d..7aa3de02c95 100644
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -164,13 +164,39 @@ def close(self):
         """Stop serving.  This leaves existing connections open."""
         raise NotImplementedError
 
+    def get_loop(self):
+        """Get the event loop the Server object is attached to."""
+        raise NotImplementedError
+
+    def is_serving(self):
+        """Return True if the server is accepting connections."""
+        raise NotImplementedError
+
+    async def start_serving(self):
+        """Start accepting connections.
+
+        This method is idempotent, so it can be called when
+        the server is already being serving.
+        """
+        raise NotImplementedError
+
+    async def serve_forever(self):
+        """Start accepting connections until the coroutine is cancelled.
+
+        The server is closed when the coroutine is cancelled.
+        """
+        raise NotImplementedError
+
     async def wait_closed(self):
         """Coroutine to wait until service is closed."""
         raise NotImplementedError
 
-    def get_loop(self):
-        """ Get the event loop the Server object is attached to."""
-        raise NotImplementedError
+    async def __aenter__(self):
+        return self
+
+    async def __aexit__(self, *exc):
+        self.close()
+        await self.wait_closed()
 
 
 class AbstractEventLoop:
@@ -279,7 +305,8 @@ def set_default_executor(self, executor):
             *, family=socket.AF_UNSPEC,
             flags=socket.AI_PASSIVE, sock=None, backlog=100,
             ssl=None, reuse_address=None, reuse_port=None,
-            ssl_handshake_timeout=None):
+            ssl_handshake_timeout=None,
+            start_serving=True):
         """A coroutine which creates a TCP server bound to host and port.
 
         The return value is a Server object which can be used to stop
@@ -319,6 +346,11 @@ def set_default_executor(self, executor):
         will wait for completion of the SSL handshake before aborting the
         connection. Default is 10s, longer timeouts may increase vulnerability
         to DoS attacks (see https://support.f5.com/csp/article/K13834)
+
+        start_serving set to True (default) causes the created server
+        to start accepting connections immediately.  When set to False,
+        the user should await Server.start_serving() or Server.serve_forever()
+        to make the server to start accepting connections.
         """
         raise NotImplementedError
 
@@ -343,7 +375,8 @@ def set_default_executor(self, executor):
     async def create_unix_server(
             self, protocol_factory, path=None, *,
             sock=None, backlog=100, ssl=None,
-            ssl_handshake_timeout=None):
+            ssl_handshake_timeout=None,
+            start_serving=True):
         """A coroutine which creates a UNIX Domain Socket server.
 
         The return value is a Server object, which can be used to stop
@@ -363,6 +396,11 @@ def set_default_executor(self, executor):
 
         ssl_handshake_timeout is the time in seconds that an SSL server
         will wait for the SSL handshake to complete (defaults to 10s).
+
+        start_serving set to True (default) causes the created server
+        to start accepting connections immediately.  When set to False,
+        the user should await Server.start_serving() or Server.serve_forever()
+        to make the server to start accepting connections.
         """
         raise NotImplementedError
 
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index 9b9d0043b50..a4d892acad0 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -250,7 +250,8 @@ def _child_watcher_callback(self, pid, returncode, transp):
     async def create_unix_server(
             self, protocol_factory, path=None, *,
             sock=None, backlog=100, ssl=None,
-            ssl_handshake_timeout=None):
+            ssl_handshake_timeout=None,
+            start_serving=True):
         if isinstance(ssl, bool):
             raise TypeError('ssl argument must be an SSLContext or None')
 
@@ -302,11 +303,12 @@ def _child_watcher_callback(self, pid, returncode, transp):
                 raise ValueError(
                     f'A UNIX Domain Stream Socket was expected, got {sock!r}')
 
-        server = base_events.Server(self, [sock])
-        sock.listen(backlog)
         sock.setblocking(False)
-        self._start_serving(protocol_factory, sock, ssl, server,
-                            ssl_handshake_timeout=ssl_handshake_timeout)
+        server = base_events.Server(self, [sock], protocol_factory,
+                                    ssl, backlog, ssl_handshake_timeout)
+        if start_serving:
+            server._start_serving()
+
         return server
 
     async def _sock_sendfile_native(self, sock, file, offset, count):
diff --git a/Lib/test/test_asyncio/test_server.py b/Lib/test/test_asyncio/test_server.py
new file mode 100644
index 00000000000..44d135db105
--- /dev/null
+++ b/Lib/test/test_asyncio/test_server.py
@@ -0,0 +1,117 @@
+import asyncio
+import socket
+import threading
+import unittest
+
+from test.test_asyncio import utils as test_utils
+from test.test_asyncio import functional as func_tests
+
+
+class BaseStartServer(func_tests.FunctionalTestCaseMixin):
+
+    def new_loop(self):
+        raise NotImplementedError
+
+    def test_start_server_1(self):
+        HELLO_MSG = b'1' * 1024 * 5 + b'\n'
+
+        def client(sock, addr):
+            sock.connect(addr)
+            sock.send(HELLO_MSG)
+            sock.recv_all(1)
+            sock.close()
+
+        async def serve(reader, writer):
+            await reader.readline()
+            main_task.cancel()
+            writer.write(b'1')
+            writer.close()
+            await writer.wait_closed()
+
+        async def main(srv):
+            async with srv:
+                await srv.serve_forever()
+
+        srv = self.loop.run_until_complete(asyncio.start_server(
+            serve, '127.0.0.1', 0, loop=self.loop, start_serving=False))
+
+        self.assertFalse(srv.is_serving())
+
+        main_task = self.loop.create_task(main(srv))
+
+        addr = srv.sockets[0].getsockname()
+        with self.assertRaises(asyncio.CancelledError):
+            with self.tcp_client(lambda sock: client(sock, addr)):
+                self.loop.run_until_complete(main_task)
+
+        self.assertEqual(srv.sockets, [])
+
+        self.assertIsNone(srv._sockets)
+        self.assertIsNone(srv._waiters)
+        self.assertFalse(srv.is_serving())
+
+        with self.assertRaisesRegex(RuntimeError, r'is closed'):
+            self.loop.run_until_complete(srv.serve_forever())
+
+
+class SelectorStartServerTests(BaseStartServer, unittest.TestCase):
+
+    def new_loop(self):
+        return asyncio.SelectorEventLoop()
+
+    @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'no Unix sockets')
+    def test_start_unix_server_1(self):
+        HELLO_MSG = b'1' * 1024 * 5 + b'\n'
+        started = threading.Event()
+
+        def client(sock, addr):
+            started.wait(5)
+            sock.connect(addr)
+            sock.send(HELLO_MSG)
+            sock.recv_all(1)
+            sock.close()
+
+        async def serve(reader, writer):
+            await reader.readline()
+            main_task.cancel()
+            writer.write(b'1')
+            writer.close()
+            await writer.wait_closed()
+
+        async def main(srv):
+            async with srv:
+                self.assertFalse(srv.is_serving())
+                await srv.start_serving()
+                self.assertTrue(srv.is_serving())
+                started.set()
+                await srv.serve_forever()
+
+        with test_utils.unix_socket_path() as addr:
+            srv = self.loop.run_until_complete(asyncio.start_unix_server(
+                serve, addr, loop=self.loop, start_serving=False))
+
+            main_task = self.loop.create_task(main(srv))
+
+            with self.assertRaises(asyncio.CancelledError):
+                with self.unix_client(lambda sock: client(sock, addr)):
+                    self.loop.run_until_complete(main_task)
+
+            self.assertEqual(srv.sockets, [])
+
+            self.assertIsNone(srv._sockets)
+            self.assertIsNone(srv._waiters)
+            self.assertFalse(srv.is_serving())
+
+            with self.assertRaisesRegex(RuntimeError, r'is closed'):
+                self.loop.run_until_complete(srv.serve_forever())
+
+
+ at unittest.skipUnless(hasattr(asyncio, 'ProactorEventLoop'), 'Windows only')
+class ProactorStartServerTests(BaseStartServer, unittest.TestCase):
+
+    def new_loop(self):
+        return asyncio.ProactorEventLoop()
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/Misc/NEWS.d/next/Library/2018-01-25-01-45-30.bpo-32662.oabhd8.rst b/Misc/NEWS.d/next/Library/2018-01-25-01-45-30.bpo-32662.oabhd8.rst
new file mode 100644
index 00000000000..44c8b951030
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2018-01-25-01-45-30.bpo-32662.oabhd8.rst
@@ -0,0 +1,3 @@
+Implement Server.start_serving(), Server.serve_forever(), and
+Server.is_serving() methods.  Add 'start_serving' keyword parameter to
+loop.create_server() and loop.create_unix_server().



More information about the Python-checkins mailing list