[Python-checkins] bpo-36802: Drop awrite()/aclose(), support await write() and await close() instead (#13099)

Andrew Svetlov webhook-mailer at python.org
Thu May 9 15:15:04 EDT 2019

commit: a076e4f5e42b85664693191d04cfb33e2f9acfa5
branch: master
author: Andrew Svetlov <andrew.svetlov at gmail.com>
committer: GitHub <noreply at github.com>
date: 2019-05-09T15:14:58-04:00

bpo-36802: Drop awrite()/aclose(), support await write() and await close() instead (#13099)

A Misc/NEWS.d/next/Library/2019-05-05-10-12-23.bpo-36802.HYMc8P.rst
M Doc/library/asyncio-stream.rst
M Lib/asyncio/streams.py
M Lib/test/test_asyncio/test_streams.py

diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst
index e686a6a1c4cd..e735b81f234d 100644
--- a/Doc/library/asyncio-stream.rst
+++ b/Doc/library/asyncio-stream.rst
@@ -22,13 +22,13 @@ streams::
             '', 8888)
         print(f'Send: {message!r}')
-        await writer.awrite(message.encode())
+        await writer.write(message.encode())
         data = await reader.read(100)
         print(f'Received: {data.decode()!r}')
         print('Close the connection')
-        await writer.aclose()
+        await writer.close()
     asyncio.run(tcp_echo_client('Hello World!'))
@@ -226,23 +226,70 @@ StreamWriter
    directly; use :func:`open_connection` and :func:`start_server`
-   .. coroutinemethod:: awrite(data)
+   .. method:: write(data)
+      The method attempts to write the *data* to the underlying socket immediately.
+      If that fails, the data is queued in an internal write buffer until it can be
+      sent.
+      Starting with Python 3.8, it is possible to directly await on the `write()`
+      method::
+         await stream.write(data)
+      The ``await`` pauses the current coroutine until the data is written to the
+      socket.
+      Below is an equivalent code that works with Python <= 3.7::
+         stream.write(data)
+         await stream.drain()
+      .. versionchanged:: 3.8
+         Support ``await stream.write(...)`` syntax.
+   .. method:: writelines(data)
+      The method writes a list (or any iterable) of bytes to the underlying socket
+      immediately.
+      If that fails, the data is queued in an internal write buffer until it can be
+      sent.
+      Starting with Python 3.8, it is possible to directly await on the `write()`
+      method::
+         await stream.writelines(lines)
+      The ``await`` pauses the current coroutine until the data is written to the
+      socket.
+      Below is an equivalent code that works with Python <= 3.7::
-      Write *data* to the stream.
+         stream.writelines(lines)
+         await stream.drain()
-      The method respects flow control, execution is paused if the write
-      buffer reaches the high watermark.
+      .. versionchanged:: 3.8
+         Support ``await stream.writelines()`` syntax.
-      .. versionadded:: 3.8
+   .. method:: close()
+      The method closes the stream and the underlying socket.
+      Starting with Python 3.8, it is possible to directly await on the `close()`
+      method::
+         await stream.close()
-   .. coroutinemethod:: aclose()
+      The ``await`` pauses the current coroutine until the stream and the underlying
+      socket are closed (and SSL shutdown is performed for a secure connection).
-      Close the stream.
+      Below is an equivalent code that works with Python <= 3.7::
-      Wait until all closing actions are complete, e.g. SSL shutdown for
-      secure sockets.
+         stream.close()
+         await stream.wait_closed()
-      .. versionadded:: 3.8
+      .. versionchanged:: 3.8
+         Support ``await stream.close()`` syntax.
    .. method:: can_write_eof()
@@ -263,21 +310,6 @@ StreamWriter
       Access optional transport information; see
       :meth:`BaseTransport.get_extra_info` for details.
-   .. method:: write(data)
-      Write *data* to the stream.
-      This method is not subject to flow control.  Calls to ``write()`` should
-      be followed by :meth:`drain`.  The :meth:`awrite` method is a
-      recommended alternative the applies flow control automatically.
-   .. method:: writelines(data)
-      Write a list (or any iterable) of bytes to the stream.
-      This method is not subject to flow control. Calls to ``writelines()``
-      should be followed by :meth:`drain`.
    .. coroutinemethod:: drain()
       Wait until it is appropriate to resume writing to the stream.
@@ -293,10 +325,6 @@ StreamWriter
       be resumed.  When there is nothing to wait for, the :meth:`drain`
       returns immediately.
-   .. method:: close()
-      Close the stream.
    .. method:: is_closing()
       Return ``True`` if the stream is closed or in the process of
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
index 79adf028212f..d9a9f5e72d3b 100644
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -352,6 +352,8 @@ def __init__(self, transport, protocol, reader, loop,
         assert reader is None or isinstance(reader, StreamReader)
         self._reader = reader
         self._loop = loop
+        self._complete_fut = self._loop.create_future()
+        self._complete_fut.set_result(None)
     def __repr__(self):
         info = [self.__class__.__name__, f'transport={self._transport!r}']
@@ -365,9 +367,33 @@ def transport(self):
     def write(self, data):
+        return self._fast_drain()
     def writelines(self, data):
+        return self._fast_drain()
+    def _fast_drain(self):
+        # The helper tries to use fast-path to return already existing complete future
+        # object if underlying transport is not paused and actual waiting for writing
+        # resume is not needed
+        if self._reader is not None:
+            # this branch will be simplified after merging reader with writer
+            exc = self._reader.exception()
+            if exc is not None:
+                fut = self._loop.create_future()
+                fut.set_exception(exc)
+                return fut
+        if not self._transport.is_closing():
+            if self._protocol._connection_lost:
+                fut = self._loop.create_future()
+                fut.set_exception(ConnectionResetError('Connection lost'))
+                return fut
+            if not self._protocol._paused:
+                # fast path, the stream is not paused
+                # no need to wait for resume signal
+                return self._complete_fut
+        return self._loop.create_task(self.drain())
     def write_eof(self):
         return self._transport.write_eof()
@@ -377,6 +403,7 @@ def can_write_eof(self):
     def close(self):
+        return self._protocol._get_close_waiter(self)
     def is_closing(self):
         return self._transport.is_closing()
@@ -408,14 +435,6 @@ def get_extra_info(self, name, default=None):
             raise ConnectionResetError('Connection lost')
         await self._protocol._drain_helper()
-    async def aclose(self):
-        self.close()
-        await self.wait_closed()
-    async def awrite(self, data):
-        self.write(data)
-        await self.drain()
 class StreamReader:
diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py
index 905141ca89c7..bf93f30e1aaf 100644
--- a/Lib/test/test_asyncio/test_streams.py
+++ b/Lib/test/test_asyncio/test_streams.py
@@ -1035,24 +1035,42 @@ def test_del_stream_before_connection_made(self):
     def test_async_writer_api(self):
+        async def inner(httpd):
+            rd, wr = await asyncio.open_connection(*httpd.address)
+            await wr.write(b'GET / HTTP/1.0\r\n\r\n')
+            data = await rd.readline()
+            self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
+            data = await rd.read()
+            self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
+            await wr.close()
         messages = []
         self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
         with test_utils.run_test_server() as httpd:
-            rd, wr = self.loop.run_until_complete(
-                asyncio.open_connection(*httpd.address,
-                                        loop=self.loop))
+            self.loop.run_until_complete(inner(httpd))
-            f = wr.awrite(b'GET / HTTP/1.0\r\n\r\n')
-            self.loop.run_until_complete(f)
-            f = rd.readline()
-            data = self.loop.run_until_complete(f)
+        self.assertEqual(messages, [])
+    def test_async_writer_api(self):
+        async def inner(httpd):
+            rd, wr = await asyncio.open_connection(*httpd.address)
+            await wr.write(b'GET / HTTP/1.0\r\n\r\n')
+            data = await rd.readline()
             self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
-            f = rd.read()
-            data = self.loop.run_until_complete(f)
+            data = await rd.read()
             self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
-            f = wr.aclose()
-            self.loop.run_until_complete(f)
+            wr.close()
+            with self.assertRaises(ConnectionResetError):
+                await wr.write(b'data')
+        messages = []
+        self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
+        with test_utils.run_test_server() as httpd:
+            self.loop.run_until_complete(inner(httpd))
         self.assertEqual(messages, [])
@@ -1066,7 +1084,7 @@ def test_eof_feed_when_closing_writer(self):
-            f = wr.aclose()
+            f = wr.close()
             assert rd.at_eof()
             f = rd.read()
diff --git a/Misc/NEWS.d/next/Library/2019-05-05-10-12-23.bpo-36802.HYMc8P.rst b/Misc/NEWS.d/next/Library/2019-05-05-10-12-23.bpo-36802.HYMc8P.rst
new file mode 100644
index 000000000000..f59863b7b7a8
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2019-05-05-10-12-23.bpo-36802.HYMc8P.rst
@@ -0,0 +1,2 @@
+Provide both sync and async calls for StreamWriter.write() and

More information about the Python-checkins mailing list