[Python-checkins] cpython (merge 3.4 -> 3.5): Issue #25304: Add asyncio.run_coroutine_threadsafe(). By Vincent Michel. (Merge

guido.van.rossum python-checkins at python.org
Sat Oct 3 11:35:55 EDT 2015


https://hg.python.org/cpython/rev/e0db10d8c95e
changeset:   98522:e0db10d8c95e
branch:      3.5
parent:      98519:6e43a3833293
parent:      98521:25e05b3e1869
user:        Guido van Rossum <guido at python.org>
date:        Sat Oct 03 08:34:34 2015 -0700
summary:
  Issue #25304: Add asyncio.run_coroutine_threadsafe(). By Vincent Michel. (Merge 3.4->3.5.)

files:
  Lib/asyncio/futures.py                |  74 +++++++++++---
  Lib/asyncio/tasks.py                  |  18 +++-
  Lib/test/test_asyncio/test_futures.py |   2 -
  Lib/test/test_asyncio/test_tasks.py   |  67 +++++++++++++
  Misc/ACKS                             |   1 +
  Misc/NEWS                             |   4 +
  6 files changed, 147 insertions(+), 19 deletions(-)


diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py
--- a/Lib/asyncio/futures.py
+++ b/Lib/asyncio/futures.py
@@ -390,22 +390,64 @@
         __await__ = __iter__ # make compatible with 'await' expression
 
 
-def wrap_future(fut, *, loop=None):
+def _set_concurrent_future_state(concurrent, source):
+    """Copy state from a future to a concurrent.futures.Future."""
+    assert source.done()
+    if source.cancelled():
+        concurrent.cancel()
+    if not concurrent.set_running_or_notify_cancel():
+        return
+    exception = source.exception()
+    if exception is not None:
+        concurrent.set_exception(exception)
+    else:
+        result = source.result()
+        concurrent.set_result(result)
+
+
+def _chain_future(source, destination):
+    """Chain two futures so that when one completes, so does the other.
+
+    The result (or exception) of source will be copied to destination.
+    If destination is cancelled, source gets cancelled too.
+    Compatible with both asyncio.Future and concurrent.futures.Future.
+    """
+    if not isinstance(source, (Future, concurrent.futures.Future)):
+        raise TypeError('A future is required for source argument')
+    if not isinstance(destination, (Future, concurrent.futures.Future)):
+        raise TypeError('A future is required for destination argument')
+    source_loop = source._loop if isinstance(source, Future) else None
+    dest_loop = destination._loop if isinstance(destination, Future) else None
+
+    def _set_state(future, other):
+        if isinstance(future, Future):
+            future._copy_state(other)
+        else:
+            _set_concurrent_future_state(future, other)
+
+    def _call_check_cancel(destination):
+        if destination.cancelled():
+            if source_loop is None or source_loop is dest_loop:
+                source.cancel()
+            else:
+                source_loop.call_soon_threadsafe(source.cancel)
+
+    def _call_set_state(source):
+        if dest_loop is None or dest_loop is source_loop:
+            _set_state(destination, source)
+        else:
+            dest_loop.call_soon_threadsafe(_set_state, destination, source)
+
+    destination.add_done_callback(_call_check_cancel)
+    source.add_done_callback(_call_set_state)
+
+
+def wrap_future(future, *, loop=None):
     """Wrap concurrent.futures.Future object."""
-    if isinstance(fut, Future):
-        return fut
-    assert isinstance(fut, concurrent.futures.Future), \
-        'concurrent.futures.Future is expected, got {!r}'.format(fut)
-    if loop is None:
-        loop = events.get_event_loop()
+    if isinstance(future, Future):
+        return future
+    assert isinstance(future, concurrent.futures.Future), \
+        'concurrent.futures.Future is expected, got {!r}'.format(future)
     new_future = Future(loop=loop)
-
-    def _check_cancel_other(f):
-        if f.cancelled():
-            fut.cancel()
-
-    new_future.add_done_callback(_check_cancel_other)
-    fut.add_done_callback(
-        lambda future: loop.call_soon_threadsafe(
-            new_future._copy_state, future))
+    _chain_future(future, new_future)
     return new_future
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -3,7 +3,7 @@
 __all__ = ['Task',
            'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
            'wait', 'wait_for', 'as_completed', 'sleep', 'async',
-           'gather', 'shield', 'ensure_future',
+           'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
            ]
 
 import concurrent.futures
@@ -692,3 +692,19 @@
 
     inner.add_done_callback(_done_callback)
     return outer
+
+
+def run_coroutine_threadsafe(coro, loop):
+    """Submit a coroutine object to a given event loop.
+
+    Return a concurrent.futures.Future to access the result.
+    """
+    if not coroutines.iscoroutine(coro):
+        raise TypeError('A coroutine object is required')
+    future = concurrent.futures.Future()
+
+    def callback():
+        futures._chain_future(ensure_future(coro, loop=loop), future)
+
+    loop.call_soon_threadsafe(callback)
+    return future
diff --git a/Lib/test/test_asyncio/test_futures.py b/Lib/test/test_asyncio/test_futures.py
--- a/Lib/test/test_asyncio/test_futures.py
+++ b/Lib/test/test_asyncio/test_futures.py
@@ -174,8 +174,6 @@
                          '<Future cancelled>')
 
     def test_copy_state(self):
-        # Test the internal _copy_state method since it's being directly
-        # invoked in other modules.
         f = asyncio.Future(loop=self.loop)
         f.set_result(10)
 
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py
--- a/Lib/test/test_asyncio/test_tasks.py
+++ b/Lib/test/test_asyncio/test_tasks.py
@@ -2100,5 +2100,72 @@
         self.assertIsInstance(f.exception(), RuntimeError)
 
 
+class RunCoroutineThreadsafeTests(test_utils.TestCase):
+    """Test case for futures.submit_to_loop."""
+
+    def setUp(self):
+        self.loop = self.new_test_loop(self.time_gen)
+
+    def time_gen(self):
+        """Handle the timer."""
+        yield 0  # second
+        yield 1  # second
+
+    @asyncio.coroutine
+    def add(self, a, b, fail=False, cancel=False):
+        """Wait 1 second and return a + b."""
+        yield from asyncio.sleep(1, loop=self.loop)
+        if fail:
+            raise RuntimeError("Fail!")
+        if cancel:
+            asyncio.tasks.Task.current_task(self.loop).cancel()
+            yield
+        return a + b
+
+    def target(self, fail=False, cancel=False, timeout=None):
+        """Run add coroutine in the event loop."""
+        coro = self.add(1, 2, fail=fail, cancel=cancel)
+        future = asyncio.run_coroutine_threadsafe(coro, self.loop)
+        try:
+            return future.result(timeout)
+        finally:
+            future.done() or future.cancel()
+
+    def test_run_coroutine_threadsafe(self):
+        """Test coroutine submission from a thread to an event loop."""
+        future = self.loop.run_in_executor(None, self.target)
+        result = self.loop.run_until_complete(future)
+        self.assertEqual(result, 3)
+
+    def test_run_coroutine_threadsafe_with_exception(self):
+        """Test coroutine submission from a thread to an event loop
+        when an exception is raised."""
+        future = self.loop.run_in_executor(None, self.target, True)
+        with self.assertRaises(RuntimeError) as exc_context:
+            self.loop.run_until_complete(future)
+        self.assertIn("Fail!", exc_context.exception.args)
+
+    def test_run_coroutine_threadsafe_with_timeout(self):
+        """Test coroutine submission from a thread to an event loop
+        when a timeout is raised."""
+        callback = lambda: self.target(timeout=0)
+        future = self.loop.run_in_executor(None, callback)
+        with self.assertRaises(asyncio.TimeoutError):
+            self.loop.run_until_complete(future)
+        # Clear the time generator and tasks
+        test_utils.run_briefly(self.loop)
+        # Check that there's no pending task (add has been cancelled)
+        for task in asyncio.Task.all_tasks(self.loop):
+            self.assertTrue(task.done())
+
+    def test_run_coroutine_threadsafe_task_cancelled(self):
+        """Test coroutine submission from a tread to an event loop
+        when the task is cancelled."""
+        callback = lambda: self.target(cancel=True)
+        future = self.loop.run_in_executor(None, callback)
+        with self.assertRaises(asyncio.CancelledError):
+            self.loop.run_until_complete(future)
+
+
 if __name__ == '__main__':
     unittest.main()
diff --git a/Misc/ACKS b/Misc/ACKS
--- a/Misc/ACKS
+++ b/Misc/ACKS
@@ -957,6 +957,7 @@
 Trent Mick
 Jason Michalski
 Franck Michea
+Vincent Michel
 Tom Middleton
 Thomas Miedema
 Stan Mihai
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -34,6 +34,10 @@
 Library
 -------
 
+- Issue #25304: Add asyncio.run_coroutine_threadsafe().  This lets you
+  submit a coroutine to a loop from another thread, returning a
+  concurrent.futures.Future.  By Vincent Michel.
+
 - Issue #25232: Fix CGIRequestHandler to split the query from the URL at the
   first question mark (?) rather than the last. Patch from Xiang Zhang.
 

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list