[Python-checkins] bpo-32250: Implement asyncio.current_task() and asyncio.all_tasks() (#4799)

Andrew Svetlov webhook-mailer at python.org
Sat Dec 16 14:58:41 EST 2017


https://github.com/python/cpython/commit/44d1a5912ea629aa20fdc377a5ab69d9ccf75d61
commit: 44d1a5912ea629aa20fdc377a5ab69d9ccf75d61
branch: master
author: Andrew Svetlov <andrew.svetlov at gmail.com>
committer: GitHub <noreply at github.com>
date: 2017-12-16T21:58:38+02:00
summary:

bpo-32250: Implement asyncio.current_task() and asyncio.all_tasks() (#4799)

files:
A Misc/NEWS.d/next/Library/2017-12-12-16-58-20.bpo-32250.UljTa0.rst
M Doc/library/asyncio-task.rst
M Lib/asyncio/tasks.py
M Lib/test/test_asyncio/test_tasks.py
M Modules/_asynciomodule.c
M Modules/clinic/_asynciomodule.c.h

diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst
index 72fae5e8559..d85dddfa02e 100644
--- a/Doc/library/asyncio-task.rst
+++ b/Doc/library/asyncio-task.rst
@@ -528,6 +528,28 @@ Task functions
    the event loop object used by the underlying task or coroutine.  If it's
    not provided, the default event loop is used.
 
+
+.. function:: current_task(loop=None):
+
+   Return the current running :class:`Task` instance or ``None``, if
+   no task is running.
+
+   If *loop* is ``None`` :func:`get_running_loop` is used to get
+   the current loop.
+
+   .. versionadded:: 3.7
+
+
+.. function:: all_tasks(loop=None):
+
+   Return a set of :class:`Task` objects created for the loop.
+
+   If *loop* is ``None`` :func:`get_event_loop` is used for getting
+   current loop.
+
+   .. versionadded:: 3.7
+
+
 .. function:: as_completed(fs, \*, loop=None, timeout=None)
 
    Return an iterator whose values, when waited for, are :class:`Future`
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index 172057e5a29..cdb483ae0eb 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -5,6 +5,8 @@
     'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
     'wait', 'wait_for', 'as_completed', 'sleep',
     'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
+    'current_task', 'all_tasks',
+    '_register_task', '_unregister_task', '_enter_task', '_leave_task',
 )
 
 import concurrent.futures
@@ -21,6 +23,20 @@
 from .coroutines import coroutine
 
 
+def current_task(loop=None):
+    """Return a currently executed task."""
+    if loop is None:
+        loop = events.get_running_loop()
+    return _current_tasks.get(loop)
+
+
+def all_tasks(loop=None):
+    """Return a set of all tasks for the loop."""
+    if loop is None:
+        loop = events.get_event_loop()
+    return {t for t, l in _all_tasks.items() if l is loop}
+
+
 class Task(futures.Future):
     """A coroutine wrapped in a Future."""
 
@@ -33,13 +49,6 @@ class Task(futures.Future):
     # _wakeup().  When _fut_waiter is not None, one of its callbacks
     # must be _wakeup().
 
-    # Weak set containing all tasks alive.
-    _all_tasks = weakref.WeakSet()
-
-    # Dictionary containing tasks that are currently active in
-    # all running event loops.  {EventLoop: Task}
-    _current_tasks = {}
-
     # If False, don't log a message if the task is destroyed whereas its
     # status is still pending
     _log_destroy_pending = True
@@ -52,9 +61,13 @@ def current_task(cls, loop=None):
 
         None is returned when called not in the context of a Task.
         """
+        warnings.warn("Task.current_task() is deprecated, "
+                      "use asyncio.current_task() instead",
+                      PendingDeprecationWarning,
+                      stacklevel=2)
         if loop is None:
             loop = events.get_event_loop()
-        return cls._current_tasks.get(loop)
+        return current_task(loop)
 
     @classmethod
     def all_tasks(cls, loop=None):
@@ -62,9 +75,11 @@ def all_tasks(cls, loop=None):
 
         By default all tasks for the current event loop are returned.
         """
-        if loop is None:
-            loop = events.get_event_loop()
-        return {t for t in cls._all_tasks if t._loop is loop}
+        warnings.warn("Task.all_tasks() is deprecated, "
+                      "use asyncio.all_tasks() instead",
+                      PendingDeprecationWarning,
+                      stacklevel=2)
+        return all_tasks(loop)
 
     def __init__(self, coro, *, loop=None):
         super().__init__(loop=loop)
@@ -81,7 +96,7 @@ def __init__(self, coro, *, loop=None):
         self._coro = coro
 
         self._loop.call_soon(self._step)
-        self.__class__._all_tasks.add(self)
+        _register_task(self._loop, self)
 
     def __del__(self):
         if self._state == futures._PENDING and self._log_destroy_pending:
@@ -173,7 +188,7 @@ def _step(self, exc=None):
         coro = self._coro
         self._fut_waiter = None
 
-        self.__class__._current_tasks[self._loop] = self
+        _enter_task(self._loop, self)
         # Call either coro.throw(exc) or coro.send(None).
         try:
             if exc is None:
@@ -237,7 +252,7 @@ def _step(self, exc=None):
                 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
                 self._loop.call_soon(self._step, new_exc)
         finally:
-            self.__class__._current_tasks.pop(self._loop)
+            _leave_task(self._loop, self)
             self = None  # Needed to break cycles when an exception occurs.
 
     def _wakeup(self, future):
@@ -715,3 +730,61 @@ def callback():
 
     loop.call_soon_threadsafe(callback)
     return future
+
+
+# WeakKeyDictionary of {Task: EventLoop} containing all tasks alive.
+# Task should be a weak reference to remove entry on task garbage
+# collection, EventLoop is required
+# to not access to private task._loop attribute.
+_all_tasks = weakref.WeakKeyDictionary()
+
+# Dictionary containing tasks that are currently active in
+# all running event loops.  {EventLoop: Task}
+_current_tasks = {}
+
+
+def _register_task(loop, task):
+    """Register a new task in asyncio as executed by loop.
+
+    Returns None.
+    """
+    _all_tasks[task] = loop
+
+
+def _enter_task(loop, task):
+    current_task = _current_tasks.get(loop)
+    if current_task is not None:
+        raise RuntimeError(f"Cannot enter into task {task!r} while another "
+                           f"task {current_task!r} is being executed.")
+    _current_tasks[loop] = task
+
+
+def _leave_task(loop, task):
+    current_task = _current_tasks.get(loop)
+    if current_task is not task:
+        raise RuntimeError(f"Leaving task {task!r} does not match "
+                           f"the current task {current_task!r}.")
+    del _current_tasks[loop]
+
+
+def _unregister_task(loop, task):
+    _all_tasks.pop(task, None)
+
+
+_py_register_task = _register_task
+_py_unregister_task = _unregister_task
+_py_enter_task = _enter_task
+_py_leave_task = _leave_task
+
+
+try:
+    from _asyncio import (_register_task, _unregister_task,
+                          _enter_task, _leave_task,
+                          _all_tasks, _current_tasks)
+except ImportError:
+    pass
+else:
+    _c_register_task = _register_task
+    _c_unregister_task = _unregister_task
+    _c_enter_task = _enter_task
+    _c_leave_task = _leave_task
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py
index a32dca13118..5429facbbcd 100644
--- a/Lib/test/test_asyncio/test_tasks.py
+++ b/Lib/test/test_asyncio/test_tasks.py
@@ -1493,53 +1493,69 @@ def coro():
         self.assertEqual(res, 'test')
         self.assertIsNone(t2.result())
 
-    def test_current_task(self):
+
+    def test_current_task_deprecated(self):
         Task = self.__class__.Task
 
-        self.assertIsNone(Task.current_task(loop=self.loop))
+        with self.assertWarns(PendingDeprecationWarning):
+            self.assertIsNone(Task.current_task(loop=self.loop))
 
-        @asyncio.coroutine
-        def coro(loop):
-            self.assertTrue(Task.current_task(loop=loop) is task)
+        async def coro(loop):
+            with self.assertWarns(PendingDeprecationWarning):
+                self.assertIs(Task.current_task(loop=loop), task)
 
             # See http://bugs.python.org/issue29271 for details:
             asyncio.set_event_loop(loop)
             try:
-                self.assertIs(Task.current_task(None), task)
-                self.assertIs(Task.current_task(), task)
+                with self.assertWarns(PendingDeprecationWarning):
+                    self.assertIs(Task.current_task(None), task)
+                with self.assertWarns(PendingDeprecationWarning):
+                    self.assertIs(Task.current_task(), task)
             finally:
                 asyncio.set_event_loop(None)
 
         task = self.new_task(self.loop, coro(self.loop))
         self.loop.run_until_complete(task)
-        self.assertIsNone(Task.current_task(loop=self.loop))
+        with self.assertWarns(PendingDeprecationWarning):
+            self.assertIsNone(Task.current_task(loop=self.loop))
 
-    def test_current_task_with_interleaving_tasks(self):
-        Task = self.__class__.Task
+    def test_current_task(self):
+        self.assertIsNone(asyncio.current_task(loop=self.loop))
 
-        self.assertIsNone(Task.current_task(loop=self.loop))
+        async def coro(loop):
+            self.assertIs(asyncio.current_task(loop=loop), task)
+
+            self.assertIs(asyncio.current_task(None), task)
+            self.assertIs(asyncio.current_task(), task)
+
+        task = self.new_task(self.loop, coro(self.loop))
+        self.loop.run_until_complete(task)
+        self.assertIsNone(asyncio.current_task(loop=self.loop))
+
+    def test_current_task_with_interleaving_tasks(self):
+        self.assertIsNone(asyncio.current_task(loop=self.loop))
 
         fut1 = self.new_future(self.loop)
         fut2 = self.new_future(self.loop)
 
         async def coro1(loop):
-            self.assertTrue(Task.current_task(loop=loop) is task1)
+            self.assertTrue(asyncio.current_task(loop=loop) is task1)
             await fut1
-            self.assertTrue(Task.current_task(loop=loop) is task1)
+            self.assertTrue(asyncio.current_task(loop=loop) is task1)
             fut2.set_result(True)
 
         async def coro2(loop):
-            self.assertTrue(Task.current_task(loop=loop) is task2)
+            self.assertTrue(asyncio.current_task(loop=loop) is task2)
             fut1.set_result(True)
             await fut2
-            self.assertTrue(Task.current_task(loop=loop) is task2)
+            self.assertTrue(asyncio.current_task(loop=loop) is task2)
 
         task1 = self.new_task(self.loop, coro1(self.loop))
         task2 = self.new_task(self.loop, coro2(self.loop))
 
         self.loop.run_until_complete(asyncio.wait((task1, task2),
                                                   loop=self.loop))
-        self.assertIsNone(Task.current_task(loop=self.loop))
+        self.assertIsNone(asyncio.current_task(loop=self.loop))
 
     # Some thorough tests for cancellation propagation through
     # coroutines, tasks and wait().
@@ -1826,6 +1842,16 @@ def foo():
         self.assertIsInstance(exception, Exception)
         self.assertEqual(exception.args, ("foo", ))
 
+    def test_all_tasks_deprecated(self):
+        Task = self.__class__.Task
+
+        async def coro():
+            with self.assertWarns(PendingDeprecationWarning):
+                assert Task.all_tasks(self.loop) == {t}
+
+        t = self.new_task(self.loop, coro())
+        self.loop.run_until_complete(t)
+
     def test_log_destroyed_pending_task(self):
         Task = self.__class__.Task
 
@@ -1845,13 +1871,13 @@ def kill_me(loop):
         coro = kill_me(self.loop)
         task = asyncio.ensure_future(coro, loop=self.loop)
 
-        self.assertEqual(Task.all_tasks(loop=self.loop), {task})
+        self.assertEqual(asyncio.all_tasks(loop=self.loop), {task})
 
         # See http://bugs.python.org/issue29271 for details:
         asyncio.set_event_loop(self.loop)
         try:
-            self.assertEqual(Task.all_tasks(), {task})
-            self.assertEqual(Task.all_tasks(None), {task})
+            self.assertEqual(asyncio.all_tasks(), {task})
+            self.assertEqual(asyncio.all_tasks(None), {task})
         finally:
             asyncio.set_event_loop(None)
 
@@ -1868,7 +1894,7 @@ def kill_me(loop):
         # no more reference to kill_me() task: the task is destroyed by the GC
         support.gc_collect()
 
-        self.assertEqual(Task.all_tasks(loop=self.loop), set())
+        self.assertEqual(asyncio.all_tasks(loop=self.loop), set())
 
         mock_handler.assert_called_with(self.loop, {
             'message': 'Task was destroyed but it is pending!',
@@ -2052,7 +2078,7 @@ def coro():
         message = m_log.error.call_args[0][0]
         self.assertIn('Task was destroyed but it is pending', message)
 
-        self.assertEqual(self.Task.all_tasks(self.loop), set())
+        self.assertEqual(asyncio.all_tasks(self.loop), set())
 
     def test_create_task_with_noncoroutine(self):
         with self.assertRaisesRegex(TypeError,
@@ -2201,6 +2227,140 @@ class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase):
     Future = futures._PyFuture
 
 
+class BaseTaskIntrospectionTests:
+    _register_task = None
+    _unregister_task = None
+    _enter_task = None
+    _leave_task = None
+
+    def test__register_task(self):
+        task = mock.Mock()
+        loop = mock.Mock()
+        self.assertEqual(asyncio.all_tasks(loop), set())
+        self._register_task(loop, task)
+        self.assertEqual(asyncio.all_tasks(loop), {task})
+        self._unregister_task(loop, task)
+
+    def test__enter_task(self):
+        task = mock.Mock()
+        loop = mock.Mock()
+        self.assertIsNone(asyncio.current_task(loop))
+        self._enter_task(loop, task)
+        self.assertIs(asyncio.current_task(loop), task)
+        self._leave_task(loop, task)
+
+    def test__enter_task_failure(self):
+        task1 = mock.Mock()
+        task2 = mock.Mock()
+        loop = mock.Mock()
+        self._enter_task(loop, task1)
+        with self.assertRaises(RuntimeError):
+            self._enter_task(loop, task2)
+        self.assertIs(asyncio.current_task(loop), task1)
+        self._leave_task(loop, task1)
+
+    def test__leave_task(self):
+        task = mock.Mock()
+        loop = mock.Mock()
+        self._enter_task(loop, task)
+        self._leave_task(loop, task)
+        self.assertIsNone(asyncio.current_task(loop))
+
+    def test__leave_task_failure1(self):
+        task1 = mock.Mock()
+        task2 = mock.Mock()
+        loop = mock.Mock()
+        self._enter_task(loop, task1)
+        with self.assertRaises(RuntimeError):
+            self._leave_task(loop, task2)
+        self.assertIs(asyncio.current_task(loop), task1)
+        self._leave_task(loop, task1)
+
+    def test__leave_task_failure2(self):
+        task = mock.Mock()
+        loop = mock.Mock()
+        with self.assertRaises(RuntimeError):
+            self._leave_task(loop, task)
+        self.assertIsNone(asyncio.current_task(loop))
+
+    def test__unregister_task(self):
+        task = mock.Mock()
+        loop = mock.Mock()
+        self._register_task(loop, task)
+        self._unregister_task(loop, task)
+        self.assertEqual(asyncio.all_tasks(loop), set())
+
+    def test__unregister_task_not_registered(self):
+        task = mock.Mock()
+        loop = mock.Mock()
+        self._unregister_task(loop, task)
+        self.assertEqual(asyncio.all_tasks(loop), set())
+
+
+class PyIntrospectionTests(unittest.TestCase, BaseTaskIntrospectionTests):
+    _register_task = staticmethod(tasks._py_register_task)
+    _unregister_task = staticmethod(tasks._py_unregister_task)
+    _enter_task = staticmethod(tasks._py_enter_task)
+    _leave_task = staticmethod(tasks._py_leave_task)
+
+
+ at unittest.skipUnless(hasattr(tasks, '_c_register_task'),
+                     'requires the C _asyncio module')
+class CIntrospectionTests(unittest.TestCase, BaseTaskIntrospectionTests):
+    _register_task = staticmethod(tasks._c_register_task)
+    _unregister_task = staticmethod(tasks._c_unregister_task)
+    _enter_task = staticmethod(tasks._c_enter_task)
+    _leave_task = staticmethod(tasks._c_leave_task)
+
+
+class BaseCurrentLoopTests:
+
+    def setUp(self):
+        super().setUp()
+        self.loop = asyncio.new_event_loop()
+        asyncio.set_event_loop(self.loop)
+
+    def tearDown(self):
+        self.loop.close()
+        asyncio.set_event_loop(None)
+        super().tearDown()
+
+    def new_task(self, coro):
+        raise NotImplementedError
+
+    def test_current_task_no_running_loop(self):
+        self.assertIsNone(asyncio.current_task(loop=self.loop))
+
+    def test_current_task_no_running_loop_implicit(self):
+        with self.assertRaises(RuntimeError):
+            asyncio.current_task()
+
+    def test_current_task_with_implicit_loop(self):
+        async def coro():
+            self.assertIs(asyncio.current_task(loop=self.loop), task)
+
+            self.assertIs(asyncio.current_task(None), task)
+            self.assertIs(asyncio.current_task(), task)
+
+        task = self.new_task(coro())
+        self.loop.run_until_complete(task)
+        self.assertIsNone(asyncio.current_task(loop=self.loop))
+
+
+class PyCurrentLoopTests(BaseCurrentLoopTests, unittest.TestCase):
+
+    def new_task(self, coro):
+        return tasks._PyTask(coro, loop=self.loop)
+
+
+ at unittest.skipUnless(hasattr(tasks, '_CTask'),
+                     'requires the C _asyncio module')
+class CCurrentLoopTests(BaseCurrentLoopTests, unittest.TestCase):
+
+    def new_task(self, coro):
+        return getattr(tasks, '_CTask')(coro, loop=self.loop)
+
+
 class GenericTaskTests(test_utils.TestCase):
 
     def test_future_subclass(self):
@@ -2522,7 +2682,7 @@ def add(self, a, b, fail=False, cancel=False):
         if fail:
             raise RuntimeError("Fail!")
         if cancel:
-            asyncio.tasks.Task.current_task(self.loop).cancel()
+            asyncio.current_task(self.loop).cancel()
             yield
         return a + b
 
@@ -2568,7 +2728,7 @@ def test_run_coroutine_threadsafe_with_timeout(self):
             self.loop.run_until_complete(future)
         test_utils.run_briefly(self.loop)
         # Check that there's no pending task (add has been cancelled)
-        for task in asyncio.Task.all_tasks(self.loop):
+        for task in asyncio.all_tasks(self.loop):
             self.assertTrue(task.done())
 
     def test_run_coroutine_threadsafe_task_cancelled(self):
diff --git a/Misc/NEWS.d/next/Library/2017-12-12-16-58-20.bpo-32250.UljTa0.rst b/Misc/NEWS.d/next/Library/2017-12-12-16-58-20.bpo-32250.UljTa0.rst
new file mode 100644
index 00000000000..f2d016df39a
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2017-12-12-16-58-20.bpo-32250.UljTa0.rst
@@ -0,0 +1,5 @@
+Implement ``asyncio.current_task()`` and ``asyncio.all_tasks()``. Add
+helpers intended to be used by alternative task implementations:
+``asyncio._register_task``, ``asyncio._enter_task``, ``asyncio._leave_task``
+and ``asyncio._unregister_task``. Deprecate ``asyncio.Task.current_task()``
+and ``asyncio.Task.all_tasks()``.
diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c
index 9ac1c44d48d..378bd08b0c5 100644
--- a/Modules/_asynciomodule.c
+++ b/Modules/_asynciomodule.c
@@ -11,9 +11,12 @@ module _asyncio
 /* identifiers used from some functions */
 _Py_IDENTIFIER(__asyncio_running_event_loop__);
 _Py_IDENTIFIER(add_done_callback);
+_Py_IDENTIFIER(all_tasks);
 _Py_IDENTIFIER(call_soon);
 _Py_IDENTIFIER(cancel);
+_Py_IDENTIFIER(current_task);
 _Py_IDENTIFIER(get_event_loop);
+_Py_IDENTIFIER(pop);
 _Py_IDENTIFIER(send);
 _Py_IDENTIFIER(throw);
 _Py_IDENTIFIER(_step);
@@ -22,19 +25,29 @@ _Py_IDENTIFIER(_wakeup);
 
 
 /* State of the _asyncio module */
-static PyObject *all_tasks;
-static PyObject *current_tasks;
+static PyObject *asyncio_mod;
+static PyObject *inspect_isgenerator;
+static PyObject *os_getpid;
 static PyObject *traceback_extract_stack;
 static PyObject *asyncio_get_event_loop_policy;
-static PyObject *asyncio_iscoroutine_func;
 static PyObject *asyncio_future_repr_info_func;
-static PyObject *asyncio_task_repr_info_func;
+static PyObject *asyncio_iscoroutine_func;
 static PyObject *asyncio_task_get_stack_func;
 static PyObject *asyncio_task_print_stack_func;
+static PyObject *asyncio_task_repr_info_func;
 static PyObject *asyncio_InvalidStateError;
 static PyObject *asyncio_CancelledError;
-static PyObject *inspect_isgenerator;
-static PyObject *os_getpid;
+
+
+/* WeakKeyDictionary of {Task: EventLoop} containing all tasks alive.
+   Task should be a weak reference to remove entry on task garbage
+   collection, EventLoop is required
+   to not access to private task._loop attribute. */
+static PyObject *current_tasks;
+
+/* Dictionary containing tasks that are currently active in
+   all running event loops.  {EventLoop: Task} */
+static PyObject *all_tasks;
 
 
 typedef enum {
@@ -1445,6 +1458,80 @@ TaskWakeupMethWrapper_new(TaskObj *task)
     return (PyObject*) o;
 }
 
+/* ----- Task introspection helpers */
+
+static int
+register_task(PyObject *loop, PyObject *task)
+{
+    return PyObject_SetItem(all_tasks, task, loop);
+}
+
+
+static int
+unregister_task(PyObject *loop, PyObject *task)
+{
+    PyObject *res;
+
+    res = _PyObject_CallMethodIdObjArgs(all_tasks, &PyId_pop,
+                                        task, Py_None, NULL);
+    if (res == NULL) {
+        return -1;
+    }
+    Py_DECREF(res);
+    return 0;
+}
+
+
+static int
+enter_task(PyObject *loop, PyObject *task)
+{
+    PyObject *item;
+    Py_hash_t hash;
+    hash = PyObject_Hash(loop);
+    if (hash == -1) {
+        return -1;
+    }
+    item = _PyDict_GetItem_KnownHash(current_tasks, loop, hash);
+    if (item != NULL) {
+        PyErr_Format(
+            PyExc_RuntimeError,
+            "Cannot enter into task %R while another " \
+            "task %R is being executed.",
+            task, item, NULL);
+        return -1;
+    }
+    if (_PyDict_SetItem_KnownHash(current_tasks, loop, task, hash) < 0) {
+        return -1;
+    }
+    return 0;
+}
+
+
+static int
+leave_task(PyObject *loop, PyObject *task)
+/*[clinic end generated code: output=0ebf6db4b858fb41 input=51296a46313d1ad8]*/
+{
+    PyObject *item;
+    Py_hash_t hash;
+    hash = PyObject_Hash(loop);
+    if (hash == -1) {
+        return -1;
+    }
+    item = _PyDict_GetItem_KnownHash(current_tasks, loop, hash);
+    if (item != task) {
+        if (item == NULL) {
+            /* Not entered, replace with None */
+            item = Py_None;
+        }
+        PyErr_Format(
+            PyExc_RuntimeError,
+            "Leaving task %R does not match the current task %R.",
+            task, item, NULL);
+        return -1;
+    }
+    return _PyDict_DelItem_KnownHash(current_tasks, loop, hash);
+}
+
 /* ----- Task */
 
 /*[clinic input]
@@ -1463,8 +1550,6 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop)
 {
     PyObject *res;
     int tmp;
-    _Py_IDENTIFIER(add);
-
     if (future_init((FutureObj*)self, loop)) {
         return -1;
     }
@@ -1500,14 +1585,7 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop)
     if (task_call_step_soon(self, NULL)) {
         return -1;
     }
-
-    res = _PyObject_CallMethodIdObjArgs(all_tasks, &PyId_add, self, NULL);
-    if (res == NULL) {
-        return -1;
-    }
-    Py_DECREF(res);
-
-    return 0;
+    return register_task(self->task_loop, (PyObject*)self);
 }
 
 static int
@@ -1600,76 +1678,36 @@ static PyObject *
 _asyncio_Task_current_task_impl(PyTypeObject *type, PyObject *loop)
 /*[clinic end generated code: output=99fbe7332c516e03 input=cd14770c5b79c7eb]*/
 {
-    PyObject *res;
+    PyObject *ret;
+    PyObject *current_task_func;
+
+    if (PyErr_WarnEx(PyExc_PendingDeprecationWarning,
+                     "Task.current_task() is deprecated, " \
+                     "use asyncio.current_task() instead",
+                     1) < 0) {
+        return NULL;
+    }
+
+    current_task_func = _PyObject_GetAttrId(asyncio_mod, &PyId_current_task);
+    if (current_task_func == NULL) {
+        return NULL;
+    }
 
     if (loop == Py_None) {
         loop = get_event_loop();
         if (loop == NULL) {
             return NULL;
         }
-
-        res = PyDict_GetItem(current_tasks, loop);
+        ret = PyObject_CallFunctionObjArgs(current_task_func, loop, NULL);
+        Py_DECREF(current_task_func);
         Py_DECREF(loop);
+        return ret;
     }
     else {
-        res = PyDict_GetItem(current_tasks, loop);
-    }
-
-    if (res == NULL) {
-        Py_RETURN_NONE;
-    }
-    else {
-        Py_INCREF(res);
-        return res;
-    }
-}
-
-static PyObject *
-task_all_tasks(PyObject *loop)
-{
-    PyObject *task;
-    PyObject *task_loop;
-    PyObject *set;
-    PyObject *iter;
-
-    assert(loop != NULL);
-
-    set = PySet_New(NULL);
-    if (set == NULL) {
-        return NULL;
-    }
-
-    iter = PyObject_GetIter(all_tasks);
-    if (iter == NULL) {
-        goto fail;
-    }
-
-    while ((task = PyIter_Next(iter))) {
-        task_loop = PyObject_GetAttrString(task, "_loop");
-        if (task_loop == NULL) {
-            Py_DECREF(task);
-            goto fail;
-        }
-        if (task_loop == loop) {
-            if (PySet_Add(set, task) == -1) {
-                Py_DECREF(task_loop);
-                Py_DECREF(task);
-                goto fail;
-            }
-        }
-        Py_DECREF(task_loop);
-        Py_DECREF(task);
-    }
-    if (PyErr_Occurred()) {
-        goto fail;
+        ret = PyObject_CallFunctionObjArgs(current_task_func, loop, NULL);
+        Py_DECREF(current_task_func);
+        return ret;
     }
-    Py_DECREF(iter);
-    return set;
-
-fail:
-    Py_DECREF(set);
-    Py_XDECREF(iter);
-    return NULL;
 }
 
 /*[clinic input]
@@ -1688,20 +1726,22 @@ _asyncio_Task_all_tasks_impl(PyTypeObject *type, PyObject *loop)
 /*[clinic end generated code: output=11f9b20749ccca5d input=497f80bc9ce726b5]*/
 {
     PyObject *res;
+    PyObject *all_tasks_func;
 
-    if (loop == Py_None) {
-        loop = get_event_loop();
-        if (loop == NULL) {
-            return NULL;
-        }
-
-        res = task_all_tasks(loop);
-        Py_DECREF(loop);
+    all_tasks_func = _PyObject_GetAttrId(asyncio_mod, &PyId_all_tasks);
+    if (all_tasks_func == NULL) {
+        return NULL;
     }
-    else {
-        res = task_all_tasks(loop);
+
+    if (PyErr_WarnEx(PyExc_PendingDeprecationWarning,
+                     "Task.all_tasks() is deprecated, " \
+                     "use asyncio.all_tasks() instead",
+                     1) < 0) {
+        return NULL;
     }
 
+    res = PyObject_CallFunctionObjArgs(all_tasks_func, loop, NULL);
+    Py_DECREF(all_tasks_func);
     return res;
 }
 
@@ -2437,11 +2477,8 @@ static PyObject *
 task_step(TaskObj *task, PyObject *exc)
 {
     PyObject *res;
-    PyObject *ot;
 
-    if (PyDict_SetItem(current_tasks,
-                       task->task_loop, (PyObject*)task) == -1)
-    {
+    if (enter_task(task->task_loop, (PyObject*)task) < 0) {
         return NULL;
     }
 
@@ -2450,19 +2487,16 @@ task_step(TaskObj *task, PyObject *exc)
     if (res == NULL) {
         PyObject *et, *ev, *tb;
         PyErr_Fetch(&et, &ev, &tb);
-        ot = _PyDict_Pop(current_tasks, task->task_loop, NULL);
-        Py_XDECREF(ot);
+        leave_task(task->task_loop, (PyObject*)task);
         _PyErr_ChainExceptions(et, ev, tb);
         return NULL;
     }
     else {
-        ot = _PyDict_Pop(current_tasks, task->task_loop, NULL);
-        if (ot == NULL) {
+        if(leave_task(task->task_loop, (PyObject*)task) < 0) {
             Py_DECREF(res);
             return NULL;
         }
         else {
-            Py_DECREF(ot);
             return res;
         }
     }
@@ -2615,6 +2649,99 @@ _asyncio_get_running_loop_impl(PyObject *module)
     return loop;
 }
 
+/*[clinic input]
+_asyncio._register_task
+
+    loop: object
+    task: object
+
+Register a new task in asyncio as executed by loop.
+
+Returns None.
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio__register_task_impl(PyObject *module, PyObject *loop,
+                             PyObject *task)
+/*[clinic end generated code: output=54c5cb733dbe0f38 input=9b5fee38fcb2c288]*/
+{
+    if (register_task(loop, task) < 0) {
+        return NULL;
+    }
+    Py_RETURN_NONE;
+}
+
+
+/*[clinic input]
+_asyncio._unregister_task
+
+    loop: object
+    task: object
+
+Unregister a task.
+
+Returns None.
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio__unregister_task_impl(PyObject *module, PyObject *loop,
+                               PyObject *task)
+/*[clinic end generated code: output=f634743a76b84ebc input=51fa1820634ef331]*/
+{
+    if (unregister_task(loop, task) < 0) {
+        return NULL;
+    }
+    Py_RETURN_NONE;
+}
+
+
+/*[clinic input]
+_asyncio._enter_task
+
+    loop: object
+    task: object
+
+Enter into task execution or resume suspended task.
+
+Task belongs to loop.
+
+Returns None.
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio__enter_task_impl(PyObject *module, PyObject *loop, PyObject *task)
+/*[clinic end generated code: output=a22611c858035b73 input=de1b06dca70d8737]*/
+{
+    if (enter_task(loop, task) < 0) {
+        return NULL;
+    }
+    Py_RETURN_NONE;
+}
+
+
+/*[clinic input]
+_asyncio._leave_task
+
+    loop: object
+    task: object
+
+Leave task execution or suspend a task.
+
+Task belongs to loop.
+
+Returns None.
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio__leave_task_impl(PyObject *module, PyObject *loop, PyObject *task)
+/*[clinic end generated code: output=0ebf6db4b858fb41 input=51296a46313d1ad8]*/
+{
+    if (leave_task(loop, task) < 0) {
+        return NULL;
+    }
+    Py_RETURN_NONE;
+}
+
 
 /*********************** Module **************************/
 
@@ -2622,26 +2749,37 @@ _asyncio_get_running_loop_impl(PyObject *module)
 static void
 module_free(void *m)
 {
-    Py_CLEAR(current_tasks);
-    Py_CLEAR(all_tasks);
+    Py_CLEAR(asyncio_mod);
+    Py_CLEAR(inspect_isgenerator);
+    Py_CLEAR(os_getpid);
     Py_CLEAR(traceback_extract_stack);
-    Py_CLEAR(asyncio_get_event_loop_policy);
     Py_CLEAR(asyncio_future_repr_info_func);
+    Py_CLEAR(asyncio_get_event_loop_policy);
     Py_CLEAR(asyncio_iscoroutine_func);
-    Py_CLEAR(asyncio_task_repr_info_func);
     Py_CLEAR(asyncio_task_get_stack_func);
     Py_CLEAR(asyncio_task_print_stack_func);
+    Py_CLEAR(asyncio_task_repr_info_func);
     Py_CLEAR(asyncio_InvalidStateError);
     Py_CLEAR(asyncio_CancelledError);
-    Py_CLEAR(inspect_isgenerator);
-    Py_CLEAR(os_getpid);
+
+    Py_CLEAR(current_tasks);
+    Py_CLEAR(all_tasks);
 }
 
 static int
 module_init(void)
 {
     PyObject *module = NULL;
-    PyObject *cls;
+
+    asyncio_mod = PyImport_ImportModule("asyncio");
+    if (asyncio_mod == NULL) {
+        goto fail;
+    }
+
+    current_tasks = PyDict_New();
+    if (current_tasks == NULL) {
+        goto fail;
+    }
 
 #define WITH_MOD(NAME) \
     Py_CLEAR(module); \
@@ -2681,19 +2819,15 @@ module_init(void)
     WITH_MOD("traceback")
     GET_MOD_ATTR(traceback_extract_stack, "extract_stack")
 
+    PyObject *weak_key_dict;
     WITH_MOD("weakref")
-    GET_MOD_ATTR(cls, "WeakSet")
-    all_tasks = _PyObject_CallNoArg(cls);
-    Py_DECREF(cls);
+    GET_MOD_ATTR(weak_key_dict, "WeakKeyDictionary");
+    all_tasks = _PyObject_CallNoArg(weak_key_dict);
+    Py_CLEAR(weak_key_dict);
     if (all_tasks == NULL) {
         goto fail;
     }
 
-    current_tasks = PyDict_New();
-    if (current_tasks == NULL) {
-        goto fail;
-    }
-
     Py_DECREF(module);
     return 0;
 
@@ -2713,6 +2847,10 @@ static PyMethodDef asyncio_methods[] = {
     _ASYNCIO_GET_RUNNING_LOOP_METHODDEF
     _ASYNCIO__GET_RUNNING_LOOP_METHODDEF
     _ASYNCIO__SET_RUNNING_LOOP_METHODDEF
+    _ASYNCIO__REGISTER_TASK_METHODDEF
+    _ASYNCIO__UNREGISTER_TASK_METHODDEF
+    _ASYNCIO__ENTER_TASK_METHODDEF
+    _ASYNCIO__LEAVE_TASK_METHODDEF
     {NULL, NULL}
 };
 
@@ -2768,5 +2906,17 @@ PyInit__asyncio(void)
         return NULL;
     }
 
+    Py_INCREF(all_tasks);
+    if (PyModule_AddObject(m, "_all_tasks", all_tasks) < 0) {
+        Py_DECREF(all_tasks);
+        return NULL;
+    }
+
+    Py_INCREF(current_tasks);
+    if (PyModule_AddObject(m, "_current_tasks", current_tasks) < 0) {
+        Py_DECREF(current_tasks);
+        return NULL;
+    }
+
     return m;
 }
diff --git a/Modules/clinic/_asynciomodule.c.h b/Modules/clinic/_asynciomodule.c.h
index 952316cc195..9d5dea52c8e 100644
--- a/Modules/clinic/_asynciomodule.c.h
+++ b/Modules/clinic/_asynciomodule.c.h
@@ -595,4 +595,142 @@ _asyncio_get_running_loop(PyObject *module, PyObject *Py_UNUSED(ignored))
 {
     return _asyncio_get_running_loop_impl(module);
 }
-/*[clinic end generated code: output=21e5424c3a5572b0 input=a9049054013a1b77]*/
+
+PyDoc_STRVAR(_asyncio__register_task__doc__,
+"_register_task($module, /, loop, task)\n"
+"--\n"
+"\n"
+"Register a new task in asyncio as executed by loop.\n"
+"\n"
+"Returns None.");
+
+#define _ASYNCIO__REGISTER_TASK_METHODDEF    \
+    {"_register_task", (PyCFunction)_asyncio__register_task, METH_FASTCALL|METH_KEYWORDS, _asyncio__register_task__doc__},
+
+static PyObject *
+_asyncio__register_task_impl(PyObject *module, PyObject *loop,
+                             PyObject *task);
+
+static PyObject *
+_asyncio__register_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
+{
+    PyObject *return_value = NULL;
+    static const char * const _keywords[] = {"loop", "task", NULL};
+    static _PyArg_Parser _parser = {"OO:_register_task", _keywords, 0};
+    PyObject *loop;
+    PyObject *task;
+
+    if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser,
+        &loop, &task)) {
+        goto exit;
+    }
+    return_value = _asyncio__register_task_impl(module, loop, task);
+
+exit:
+    return return_value;
+}
+
+PyDoc_STRVAR(_asyncio__unregister_task__doc__,
+"_unregister_task($module, /, loop, task)\n"
+"--\n"
+"\n"
+"Unregister a task.\n"
+"\n"
+"Returns None.");
+
+#define _ASYNCIO__UNREGISTER_TASK_METHODDEF    \
+    {"_unregister_task", (PyCFunction)_asyncio__unregister_task, METH_FASTCALL|METH_KEYWORDS, _asyncio__unregister_task__doc__},
+
+static PyObject *
+_asyncio__unregister_task_impl(PyObject *module, PyObject *loop,
+                               PyObject *task);
+
+static PyObject *
+_asyncio__unregister_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
+{
+    PyObject *return_value = NULL;
+    static const char * const _keywords[] = {"loop", "task", NULL};
+    static _PyArg_Parser _parser = {"OO:_unregister_task", _keywords, 0};
+    PyObject *loop;
+    PyObject *task;
+
+    if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser,
+        &loop, &task)) {
+        goto exit;
+    }
+    return_value = _asyncio__unregister_task_impl(module, loop, task);
+
+exit:
+    return return_value;
+}
+
+PyDoc_STRVAR(_asyncio__enter_task__doc__,
+"_enter_task($module, /, loop, task)\n"
+"--\n"
+"\n"
+"Enter into task execution or resume suspended task.\n"
+"\n"
+"Task belongs to loop.\n"
+"\n"
+"Returns None.");
+
+#define _ASYNCIO__ENTER_TASK_METHODDEF    \
+    {"_enter_task", (PyCFunction)_asyncio__enter_task, METH_FASTCALL|METH_KEYWORDS, _asyncio__enter_task__doc__},
+
+static PyObject *
+_asyncio__enter_task_impl(PyObject *module, PyObject *loop, PyObject *task);
+
+static PyObject *
+_asyncio__enter_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
+{
+    PyObject *return_value = NULL;
+    static const char * const _keywords[] = {"loop", "task", NULL};
+    static _PyArg_Parser _parser = {"OO:_enter_task", _keywords, 0};
+    PyObject *loop;
+    PyObject *task;
+
+    if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser,
+        &loop, &task)) {
+        goto exit;
+    }
+    return_value = _asyncio__enter_task_impl(module, loop, task);
+
+exit:
+    return return_value;
+}
+
+PyDoc_STRVAR(_asyncio__leave_task__doc__,
+"_leave_task($module, /, loop, task)\n"
+"--\n"
+"\n"
+"Leave task execution or suspend a task.\n"
+"\n"
+"Task belongs to loop.\n"
+"\n"
+"Returns None.");
+
+#define _ASYNCIO__LEAVE_TASK_METHODDEF    \
+    {"_leave_task", (PyCFunction)_asyncio__leave_task, METH_FASTCALL|METH_KEYWORDS, _asyncio__leave_task__doc__},
+
+static PyObject *
+_asyncio__leave_task_impl(PyObject *module, PyObject *loop, PyObject *task);
+
+static PyObject *
+_asyncio__leave_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
+{
+    PyObject *return_value = NULL;
+    static const char * const _keywords[] = {"loop", "task", NULL};
+    static _PyArg_Parser _parser = {"OO:_leave_task", _keywords, 0};
+    PyObject *loop;
+    PyObject *task;
+
+    if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser,
+        &loop, &task)) {
+        goto exit;
+    }
+    return_value = _asyncio__leave_task_impl(module, loop, task);
+
+exit:
+    return return_value;
+}
+/*[clinic end generated code: output=0033af17965b51b4 input=a9049054013a1b77]*/



More information about the Python-checkins mailing list