[Python-checkins] bpo-32751: Wait for task cancellation in asyncio.wait_for() (GH-7216)

Miss Islington (bot) webhook-mailer at python.org
Tue May 29 18:37:10 EDT 2018


https://github.com/python/cpython/commit/d8948c5e09c4a2a818f6f6cfaf8064f2c2138fa5
commit: d8948c5e09c4a2a818f6f6cfaf8064f2c2138fa5
branch: 3.7
author: Miss Islington (bot) <31488909+miss-islington at users.noreply.github.com>
committer: GitHub <noreply at github.com>
date: 2018-05-29T15:37:06-07:00
summary:

bpo-32751: Wait for task cancellation in asyncio.wait_for() (GH-7216)


Currently, asyncio.wait_for(fut), upon reaching the timeout deadline,
cancels the future and returns immediately.  This is problematic for
when *fut* is a Task, because it will be left running for an arbitrary
amount of time.  This behavior is iself surprising and may lead to
related bugs such as the one described in bpo-33638:

    condition = asyncio.Condition()
    async with condition:
        await asyncio.wait_for(condition.wait(), timeout=0.5)

Currently, instead of raising a TimeoutError, the above code will fail
with `RuntimeError: cannot wait on un-acquired lock`, because
`__aexit__` is reached _before_ `condition.wait()` finishes its
cancellation and re-acquires the condition lock.

To resolve this, make `wait_for` await for the task cancellation.
The tradeoff here is that the `timeout` promise may be broken if the
task decides to handle its cancellation in a slow way.  This represents
a behavior change and should probably not be back-patched to 3.6 and
earlier.
(cherry picked from commit e2b340ab4196e1beb902327f503574b5d7369185)

Co-authored-by: Elvis Pranskevichus <elvis at magic.io>

files:
A Misc/NEWS.d/next/Library/2018-05-29-15-32-18.bpo-32751.oBTqr7.rst
M Doc/library/asyncio-task.rst
M Lib/asyncio/tasks.py
M Lib/test/test_asyncio/test_locks.py
M Lib/test/test_asyncio/test_tasks.py

diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst
index dc450c375aad..3121b4718332 100644
--- a/Doc/library/asyncio-task.rst
+++ b/Doc/library/asyncio-task.rst
@@ -790,7 +790,9 @@ Task functions
 
    Returns result of the Future or coroutine.  When a timeout occurs, it
    cancels the task and raises :exc:`asyncio.TimeoutError`. To avoid the task
-   cancellation, wrap it in :func:`shield`.
+   cancellation, wrap it in :func:`shield`.  The function will wait until
+   the future is actually cancelled, so the total wait time may exceed
+   the *timeout*.
 
    If the wait is cancelled, the future *fut* is also cancelled.
 
@@ -800,3 +802,8 @@ Task functions
 
    .. versionchanged:: 3.4.3
       If the wait is cancelled, the future *fut* is now also cancelled.
+
+   .. versionchanged:: 3.7
+      When *fut* is cancelled due to a timeout, ``wait_for`` now waits
+      for *fut* to be cancelled.  Previously,
+      it raised :exc:`~asyncio.TimeoutError` immediately.
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index 6cef33d5212e..72792a25cf55 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -412,14 +412,17 @@ def _release_waiter(waiter, *args):
             return fut.result()
         else:
             fut.remove_done_callback(cb)
-            fut.cancel()
+            # We must ensure that the task is not running
+            # after wait_for() returns.
+            # See https://bugs.python.org/issue32751
+            await _cancel_and_wait(fut, loop=loop)
             raise futures.TimeoutError()
     finally:
         timeout_handle.cancel()
 
 
 async def _wait(fs, timeout, return_when, loop):
-    """Internal helper for wait() and wait_for().
+    """Internal helper for wait().
 
     The fs argument must be a collection of Futures.
     """
@@ -461,6 +464,22 @@ def _on_completion(f):
     return done, pending
 
 
+async def _cancel_and_wait(fut, loop):
+    """Cancel the *fut* future or task and wait until it completes."""
+
+    waiter = loop.create_future()
+    cb = functools.partial(_release_waiter, waiter)
+    fut.add_done_callback(cb)
+
+    try:
+        fut.cancel()
+        # We cannot wait on *fut* directly to make
+        # sure _cancel_and_wait itself is reliably cancellable.
+        await waiter
+    finally:
+        fut.remove_done_callback(cb)
+
+
 # This is *not* a @coroutine!  It is just an iterator (yielding Futures).
 def as_completed(fs, *, loop=None, timeout=None):
     """Return an iterator whose values are coroutines.
diff --git a/Lib/test/test_asyncio/test_locks.py b/Lib/test/test_asyncio/test_locks.py
index 8642aa86b92b..b8d155e1d034 100644
--- a/Lib/test/test_asyncio/test_locks.py
+++ b/Lib/test/test_asyncio/test_locks.py
@@ -807,6 +807,19 @@ def test_ambiguous_loops(self):
         with self.assertRaises(ValueError):
             asyncio.Condition(lock, loop=loop)
 
+    def test_timeout_in_block(self):
+        loop = asyncio.new_event_loop()
+        self.addCleanup(loop.close)
+
+        async def task_timeout():
+            condition = asyncio.Condition(loop=loop)
+            async with condition:
+                with self.assertRaises(asyncio.TimeoutError):
+                    await asyncio.wait_for(condition.wait(), timeout=0.5,
+                                           loop=loop)
+
+        loop.run_until_complete(task_timeout())
+
 
 class SemaphoreTests(test_utils.TestCase):
 
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py
index 1280584d318c..1282a98c218f 100644
--- a/Lib/test/test_asyncio/test_tasks.py
+++ b/Lib/test/test_asyncio/test_tasks.py
@@ -789,6 +789,62 @@ def gen():
         res = loop.run_until_complete(task)
         self.assertEqual(res, "ok")
 
+    def test_wait_for_waits_for_task_cancellation(self):
+        loop = asyncio.new_event_loop()
+        self.addCleanup(loop.close)
+
+        task_done = False
+
+        async def foo():
+            async def inner():
+                nonlocal task_done
+                try:
+                    await asyncio.sleep(0.2, loop=loop)
+                finally:
+                    task_done = True
+
+            inner_task = self.new_task(loop, inner())
+
+            with self.assertRaises(asyncio.TimeoutError):
+                await asyncio.wait_for(inner_task, timeout=0.1, loop=loop)
+
+            self.assertTrue(task_done)
+
+        loop.run_until_complete(foo())
+
+    def test_wait_for_self_cancellation(self):
+        loop = asyncio.new_event_loop()
+        self.addCleanup(loop.close)
+
+        async def foo():
+            async def inner():
+                try:
+                    await asyncio.sleep(0.3, loop=loop)
+                except asyncio.CancelledError:
+                    try:
+                        await asyncio.sleep(0.3, loop=loop)
+                    except asyncio.CancelledError:
+                        await asyncio.sleep(0.3, loop=loop)
+
+                return 42
+
+            inner_task = self.new_task(loop, inner())
+
+            wait = asyncio.wait_for(inner_task, timeout=0.1, loop=loop)
+
+            # Test that wait_for itself is properly cancellable
+            # even when the initial task holds up the initial cancellation.
+            task = self.new_task(loop, wait)
+            await asyncio.sleep(0.2, loop=loop)
+            task.cancel()
+
+            with self.assertRaises(asyncio.CancelledError):
+                await task
+
+            self.assertEqual(await inner_task, 42)
+
+        loop.run_until_complete(foo())
+
     def test_wait(self):
 
         def gen():
diff --git a/Misc/NEWS.d/next/Library/2018-05-29-15-32-18.bpo-32751.oBTqr7.rst b/Misc/NEWS.d/next/Library/2018-05-29-15-32-18.bpo-32751.oBTqr7.rst
new file mode 100644
index 000000000000..3e27cd461ca8
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2018-05-29-15-32-18.bpo-32751.oBTqr7.rst
@@ -0,0 +1,2 @@
+When cancelling the task due to a timeout, :meth:`asyncio.wait_for` will now
+wait until the cancellation is complete.



More information about the Python-checkins mailing list