[Python-checkins] bpo-47062: Implement asyncio.Runner context manager (GH-31799)

asvetlov webhook-mailer at python.org
Thu Mar 24 15:51:27 EDT 2022


https://github.com/python/cpython/commit/4119d2d7c9e25acd4f16994fb92d656f8b7816d7
commit: 4119d2d7c9e25acd4f16994fb92d656f8b7816d7
branch: main
author: Andrew Svetlov <andrew.svetlov at gmail.com>
committer: asvetlov <andrew.svetlov at gmail.com>
date: 2022-03-24T21:51:16+02:00
summary:

bpo-47062: Implement asyncio.Runner context manager (GH-31799)

Co-authored-by: Zachary Ware <zach at python.org>

files:
A Doc/library/asyncio-runner.rst
A Misc/NEWS.d/next/Library/2022-03-18-22-46-18.bpo-47062.RNc99_.rst
M Doc/library/asyncio-task.rst
M Doc/library/asyncio.rst
M Lib/asyncio/runners.py
M Lib/test/test_asyncio/test_runners.py
M Lib/unittest/async_case.py

diff --git a/Doc/library/asyncio-runner.rst b/Doc/library/asyncio-runner.rst
new file mode 100644
index 0000000000000..2f4de9edaa400
--- /dev/null
+++ b/Doc/library/asyncio-runner.rst
@@ -0,0 +1,121 @@
+.. currentmodule:: asyncio
+
+
+=======
+Runners
+=======
+
+**Source code:** :source:`Lib/asyncio/runners.py`
+
+
+This section outlines high-level asyncio primitives to run asyncio code.
+
+They are built on top of an :ref:`event loop <asyncio-event-loop>` with the aim
+to simplify async code usage for common wide-spread scenarios.
+
+.. contents::
+   :depth: 1
+   :local:
+
+
+
+Running an asyncio Program
+==========================
+
+.. function:: run(coro, *, debug=None)
+
+   Execute the :term:`coroutine` *coro* and return the result.
+
+   This function runs the passed coroutine, taking care of
+   managing the asyncio event loop, *finalizing asynchronous
+   generators*, and closing the threadpool.
+
+   This function cannot be called when another asyncio event loop is
+   running in the same thread.
+
+   If *debug* is ``True``, the event loop will be run in debug mode. ``False`` disables
+   debug mode explicitly. ``None`` is used to respect the global
+   :ref:`asyncio-debug-mode` settings.
+
+   This function always creates a new event loop and closes it at
+   the end.  It should be used as a main entry point for asyncio
+   programs, and should ideally only be called once.
+
+   Example::
+
+       async def main():
+           await asyncio.sleep(1)
+           print('hello')
+
+       asyncio.run(main())
+
+   .. versionadded:: 3.7
+
+   .. versionchanged:: 3.9
+      Updated to use :meth:`loop.shutdown_default_executor`.
+
+   .. versionchanged:: 3.10
+
+      *debug* is ``None`` by default to respect the global debug mode settings.
+
+
+Runner context manager
+======================
+
+.. class:: Runner(*, debug=None, factory=None)
+
+   A context manager that simplifies *multiple* async function calls in the same
+   context.
+
+   Sometimes several top-level async functions should be called in the same :ref:`event
+   loop <asyncio-event-loop>` and :class:`contextvars.Context`.
+
+   If *debug* is ``True``, the event loop will be run in debug mode. ``False`` disables
+   debug mode explicitly. ``None`` is used to respect the global
+   :ref:`asyncio-debug-mode` settings.
+
+   *factory* could be used for overriding the loop creation.
+   :func:`asyncio.new_event_loop` is used if ``None``.
+
+   Basically, :func:`asyncio.run()` example can be rewritten with the runner usage::
+
+        async def main():
+            await asyncio.sleep(1)
+            print('hello')
+
+        with asyncio.Runner() as runner:
+            runner.run(main())
+
+   .. versionadded:: 3.11
+
+   .. method:: run(coro, *, context=None)
+
+      Run a :term:`coroutine <coroutine>` *coro* in the embedded loop.
+
+      Return the coroutine's result or raise its exception.
+
+      An optional keyword-only *context* argument allows specifying a
+      custom :class:`contextvars.Context` for the *coro* to run in.
+      The runner's default context is used if ``None``.
+
+      This function cannot be called when another asyncio event loop is
+      running in the same thread.
+
+   .. method:: close()
+
+      Close the runner.
+
+      Finalize asynchronous generators, shutdown default executor, close the event loop
+      and release embedded :class:`contextvars.Context`.
+
+   .. method:: get_loop()
+
+      Return the event loop associated with the runner instance.
+
+   .. note::
+
+      :class:`Runner` uses the lazy initialization strategy, its constructor doesn't
+      initialize underlying low-level structures.
+
+      Embedded *loop* and *context* are created at the :keyword:`with` body entering
+      or the first call of :meth:`run` or :meth:`get_loop`.
diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst
index 21a4cb5820bd1..c104ac5b9a3b8 100644
--- a/Doc/library/asyncio-task.rst
+++ b/Doc/library/asyncio-task.rst
@@ -204,43 +204,6 @@ A good example of a low-level function that returns a Future object
 is :meth:`loop.run_in_executor`.
 
 
-Running an asyncio Program
-==========================
-
-.. function:: run(coro, *, debug=False)
-
-    Execute the :term:`coroutine` *coro* and return the result.
-
-    This function runs the passed coroutine, taking care of
-    managing the asyncio event loop, *finalizing asynchronous
-    generators*, and closing the threadpool.
-
-    This function cannot be called when another asyncio event loop is
-    running in the same thread.
-
-    If *debug* is ``True``, the event loop will be run in debug mode.
-
-    This function always creates a new event loop and closes it at
-    the end.  It should be used as a main entry point for asyncio
-    programs, and should ideally only be called once.
-
-    Example::
-
-        async def main():
-            await asyncio.sleep(1)
-            print('hello')
-
-        asyncio.run(main())
-
-    .. versionadded:: 3.7
-
-    .. versionchanged:: 3.9
-       Updated to use :meth:`loop.shutdown_default_executor`.
-
-    .. note::
-       The source code for ``asyncio.run()`` can be found in
-       :source:`Lib/asyncio/runners.py`.
-
 Creating Tasks
 ==============
 
diff --git a/Doc/library/asyncio.rst b/Doc/library/asyncio.rst
index 94a853259d348..8b3a060ffad52 100644
--- a/Doc/library/asyncio.rst
+++ b/Doc/library/asyncio.rst
@@ -67,6 +67,7 @@ Additionally, there are **low-level** APIs for
    :caption: High-level APIs
    :maxdepth: 1
 
+   asyncio-runner.rst
    asyncio-task.rst
    asyncio-stream.rst
    asyncio-sync.rst
diff --git a/Lib/asyncio/runners.py b/Lib/asyncio/runners.py
index 9a5e9a48479ef..975509c7d645d 100644
--- a/Lib/asyncio/runners.py
+++ b/Lib/asyncio/runners.py
@@ -1,10 +1,112 @@
-__all__ = 'run',
+__all__ = ('Runner', 'run')
 
+import contextvars
+import enum
 from . import coroutines
 from . import events
 from . import tasks
 
 
+class _State(enum.Enum):
+    CREATED = "created"
+    INITIALIZED = "initialized"
+    CLOSED = "closed"
+
+
+class Runner:
+    """A context manager that controls event loop life cycle.
+
+    The context manager always creates a new event loop,
+    allows to run async functions inside it,
+    and properly finalizes the loop at the context manager exit.
+
+    If debug is True, the event loop will be run in debug mode.
+    If factory is passed, it is used for new event loop creation.
+
+    asyncio.run(main(), debug=True)
+
+    is a shortcut for
+
+    with asyncio.Runner(debug=True) as runner:
+        runner.run(main())
+
+    The run() method can be called multiple times within the runner's context.
+
+    This can be useful for interactive console (e.g. IPython),
+    unittest runners, console tools, -- everywhere when async code
+    is called from existing sync framework and where the preferred single
+    asyncio.run() call doesn't work.
+
+    """
+
+    # Note: the class is final, it is not intended for inheritance.
+
+    def __init__(self, *, debug=None, factory=None):
+        self._state = _State.CREATED
+        self._debug = debug
+        self._factory = factory
+        self._loop = None
+        self._context = None
+
+    def __enter__(self):
+        self._lazy_init()
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
+
+    def close(self):
+        """Shutdown and close event loop."""
+        if self._state is not _State.INITIALIZED:
+            return
+        try:
+            loop = self._loop
+            _cancel_all_tasks(loop)
+            loop.run_until_complete(loop.shutdown_asyncgens())
+            loop.run_until_complete(loop.shutdown_default_executor())
+        finally:
+            loop.close()
+            self._loop = None
+            self._state = _State.CLOSED
+
+    def get_loop(self):
+        """Return embedded event loop."""
+        self._lazy_init()
+        return self._loop
+
+    def run(self, coro, *, context=None):
+        """Run a coroutine inside the embedded event loop."""
+        if not coroutines.iscoroutine(coro):
+            raise ValueError("a coroutine was expected, got {!r}".format(coro))
+
+        if events._get_running_loop() is not None:
+            # fail fast with short traceback
+            raise RuntimeError(
+                "Runner.run() cannot be called from a running event loop")
+
+        self._lazy_init()
+
+        if context is None:
+            context = self._context
+        task = self._loop.create_task(coro, context=context)
+        return self._loop.run_until_complete(task)
+
+    def _lazy_init(self):
+        if self._state is _State.CLOSED:
+            raise RuntimeError("Runner is closed")
+        if self._state is _State.INITIALIZED:
+            return
+        if self._factory is None:
+            self._loop = events.new_event_loop()
+        else:
+            self._loop = self._factory()
+        if self._debug is not None:
+            self._loop.set_debug(self._debug)
+        self._context = contextvars.copy_context()
+        self._state = _State.INITIALIZED
+
+
+
 def run(main, *, debug=None):
     """Execute the coroutine and return the result.
 
@@ -30,26 +132,12 @@ async def main():
         asyncio.run(main())
     """
     if events._get_running_loop() is not None:
+        # fail fast with short traceback
         raise RuntimeError(
             "asyncio.run() cannot be called from a running event loop")
 
-    if not coroutines.iscoroutine(main):
-        raise ValueError("a coroutine was expected, got {!r}".format(main))
-
-    loop = events.new_event_loop()
-    try:
-        events.set_event_loop(loop)
-        if debug is not None:
-            loop.set_debug(debug)
-        return loop.run_until_complete(main)
-    finally:
-        try:
-            _cancel_all_tasks(loop)
-            loop.run_until_complete(loop.shutdown_asyncgens())
-            loop.run_until_complete(loop.shutdown_default_executor())
-        finally:
-            events.set_event_loop(None)
-            loop.close()
+    with Runner(debug=debug) as runner:
+        return runner.run(main)
 
 
 def _cancel_all_tasks(loop):
diff --git a/Lib/test/test_asyncio/test_runners.py b/Lib/test/test_asyncio/test_runners.py
index 112273662b20b..2919412ab81db 100644
--- a/Lib/test/test_asyncio/test_runners.py
+++ b/Lib/test/test_asyncio/test_runners.py
@@ -1,4 +1,7 @@
 import asyncio
+import contextvars
+import gc
+import re
 import unittest
 
 from unittest import mock
@@ -186,5 +189,135 @@ async def main():
         self.assertFalse(spinner.ag_running)
 
 
+class RunnerTests(BaseTest):
+
+    def test_non_debug(self):
+        with asyncio.Runner(debug=False) as runner:
+            self.assertFalse(runner.get_loop().get_debug())
+
+    def test_debug(self):
+        with asyncio.Runner(debug=True) as runner:
+            self.assertTrue(runner.get_loop().get_debug())
+
+    def test_custom_factory(self):
+        loop = mock.Mock()
+        with asyncio.Runner(factory=lambda: loop) as runner:
+            self.assertIs(runner.get_loop(), loop)
+
+    def test_run(self):
+        async def f():
+            await asyncio.sleep(0)
+            return 'done'
+
+        with asyncio.Runner() as runner:
+            self.assertEqual('done', runner.run(f()))
+            loop = runner.get_loop()
+
+        with self.assertRaisesRegex(
+            RuntimeError,
+            "Runner is closed"
+        ):
+            runner.get_loop()
+
+        self.assertTrue(loop.is_closed())
+
+    def test_run_non_coro(self):
+        with asyncio.Runner() as runner:
+            with self.assertRaisesRegex(
+                ValueError,
+                "a coroutine was expected"
+            ):
+                runner.run(123)
+
+    def test_run_future(self):
+        with asyncio.Runner() as runner:
+            with self.assertRaisesRegex(
+                ValueError,
+                "a coroutine was expected"
+            ):
+                fut = runner.get_loop().create_future()
+                runner.run(fut)
+
+    def test_explicit_close(self):
+        runner = asyncio.Runner()
+        loop = runner.get_loop()
+        runner.close()
+        with self.assertRaisesRegex(
+                RuntimeError,
+                "Runner is closed"
+        ):
+            runner.get_loop()
+
+        self.assertTrue(loop.is_closed())
+
+    def test_double_close(self):
+        runner = asyncio.Runner()
+        loop = runner.get_loop()
+
+        runner.close()
+        self.assertTrue(loop.is_closed())
+
+        # the second call is no-op
+        runner.close()
+        self.assertTrue(loop.is_closed())
+
+    def test_second_with_block_raises(self):
+        ret = []
+
+        async def f(arg):
+            ret.append(arg)
+
+        runner = asyncio.Runner()
+        with runner:
+            runner.run(f(1))
+
+        with self.assertRaisesRegex(
+            RuntimeError,
+            "Runner is closed"
+        ):
+            with runner:
+                runner.run(f(2))
+
+        self.assertEqual([1], ret)
+
+    def test_run_keeps_context(self):
+        cvar = contextvars.ContextVar("cvar", default=-1)
+
+        async def f(val):
+            old = cvar.get()
+            await asyncio.sleep(0)
+            cvar.set(val)
+            return old
+
+        async def get_context():
+            return contextvars.copy_context()
+
+        with asyncio.Runner() as runner:
+            self.assertEqual(-1, runner.run(f(1)))
+            self.assertEqual(1, runner.run(f(2)))
+
+            self.assertEqual({cvar: 2}, dict(runner.run(get_context())))
+
+    def test_recursine_run(self):
+        async def g():
+            pass
+
+        async def f():
+            runner.run(g())
+
+        with asyncio.Runner() as runner:
+            with self.assertWarnsRegex(
+                RuntimeWarning,
+                "coroutine .+ was never awaited",
+            ):
+                with self.assertRaisesRegex(
+                    RuntimeError,
+                    re.escape(
+                        "Runner.run() cannot be called from a running event loop"
+                    ),
+                ):
+                    runner.run(f())
+
+
 if __name__ == '__main__':
     unittest.main()
diff --git a/Lib/unittest/async_case.py b/Lib/unittest/async_case.py
index 25adc3deff63d..85b938fb293af 100644
--- a/Lib/unittest/async_case.py
+++ b/Lib/unittest/async_case.py
@@ -34,7 +34,7 @@ class IsolatedAsyncioTestCase(TestCase):
 
     def __init__(self, methodName='runTest'):
         super().__init__(methodName)
-        self._asyncioTestLoop = None
+        self._asyncioRunner = None
         self._asyncioTestContext = contextvars.copy_context()
 
     async def asyncSetUp(self):
@@ -75,76 +75,44 @@ def _callCleanup(self, function, *args, **kwargs):
         self._callMaybeAsync(function, *args, **kwargs)
 
     def _callAsync(self, func, /, *args, **kwargs):
-        assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'
+        assert self._asyncioRunner is not None, 'asyncio runner is not initialized'
         assert inspect.iscoroutinefunction(func), f'{func!r} is not an async function'
-        task = self._asyncioTestLoop.create_task(
+        return self._asyncioRunner.run(
             func(*args, **kwargs),
-            context=self._asyncioTestContext,
+            context=self._asyncioTestContext
         )
-        return self._asyncioTestLoop.run_until_complete(task)
 
     def _callMaybeAsync(self, func, /, *args, **kwargs):
-        assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'
+        assert self._asyncioRunner is not None, 'asyncio runner is not initialized'
         if inspect.iscoroutinefunction(func):
-            task = self._asyncioTestLoop.create_task(
+            return self._asyncioRunner.run(
                 func(*args, **kwargs),
                 context=self._asyncioTestContext,
             )
-            return self._asyncioTestLoop.run_until_complete(task)
         else:
             return self._asyncioTestContext.run(func, *args, **kwargs)
 
-    def _setupAsyncioLoop(self):
-        assert self._asyncioTestLoop is None, 'asyncio test loop already initialized'
-        loop = asyncio.new_event_loop()
-        asyncio.set_event_loop(loop)
-        loop.set_debug(True)
-        self._asyncioTestLoop = loop
+    def _setupAsyncioRunner(self):
+        assert self._asyncioRunner is None, 'asyncio runner is already initialized'
+        runner = asyncio.Runner(debug=True)
+        self._asyncioRunner = runner
 
-    def _tearDownAsyncioLoop(self):
-        assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'
-        loop = self._asyncioTestLoop
-        self._asyncioTestLoop = None
-
-        try:
-            # cancel all tasks
-            to_cancel = asyncio.all_tasks(loop)
-            if not to_cancel:
-                return
-
-            for task in to_cancel:
-                task.cancel()
-
-            loop.run_until_complete(
-                asyncio.gather(*to_cancel, return_exceptions=True))
-
-            for task in to_cancel:
-                if task.cancelled():
-                    continue
-                if task.exception() is not None:
-                    loop.call_exception_handler({
-                        'message': 'unhandled exception during test shutdown',
-                        'exception': task.exception(),
-                        'task': task,
-                    })
-            # shutdown asyncgens
-            loop.run_until_complete(loop.shutdown_asyncgens())
-        finally:
-            asyncio.set_event_loop(None)
-            loop.close()
+    def _tearDownAsyncioRunner(self):
+        runner = self._asyncioRunner
+        runner.close()
 
     def run(self, result=None):
-        self._setupAsyncioLoop()
+        self._setupAsyncioRunner()
         try:
             return super().run(result)
         finally:
-            self._tearDownAsyncioLoop()
+            self._tearDownAsyncioRunner()
 
     def debug(self):
-        self._setupAsyncioLoop()
+        self._setupAsyncioRunner()
         super().debug()
-        self._tearDownAsyncioLoop()
+        self._tearDownAsyncioRunner()
 
     def __del__(self):
-        if self._asyncioTestLoop is not None:
-            self._tearDownAsyncioLoop()
+        if self._asyncioRunner is not None:
+            self._tearDownAsyncioRunner()
diff --git a/Misc/NEWS.d/next/Library/2022-03-18-22-46-18.bpo-47062.RNc99_.rst b/Misc/NEWS.d/next/Library/2022-03-18-22-46-18.bpo-47062.RNc99_.rst
new file mode 100644
index 0000000000000..7d5bfc114a8d1
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2022-03-18-22-46-18.bpo-47062.RNc99_.rst
@@ -0,0 +1 @@
+Implement :class:`asyncio.Runner` context manager.



More information about the Python-checkins mailing list