[Python-checkins] cpython (3.4): Issue #22926: In debug mode, call_soon(), call_at() and call_later() methods of

victor.stinner python-checkins at python.org
Fri Dec 26 21:46:38 CET 2014


https://hg.python.org/cpython/rev/4e8ac4173b3c
changeset:   93973:4e8ac4173b3c
branch:      3.4
user:        Victor Stinner <victor.stinner at gmail.com>
date:        Fri Dec 26 21:07:52 2014 +0100
summary:
  Issue #22926: In debug mode, call_soon(), call_at() and call_later() methods of
asyncio.BaseEventLoop now use the identifier of the current thread to ensure
that they are called from the thread running the event loop.

Before, the get_event_loop() method was used to check the thread, and no
exception was raised when the thread had no event loop. Now the methods always
raise an exception in debug mode when called from the wrong thread. It should
help to notice misusage of the API.

files:
  Lib/asyncio/base_events.py                    |  38 ++--
  Lib/asyncio/proactor_events.py                |   6 +-
  Lib/asyncio/selector_events.py                |   2 +-
  Lib/test/test_asyncio/test_base_events.py     |  76 +++++++--
  Lib/test/test_asyncio/test_proactor_events.py |   7 +-
  Lib/test/test_asyncio/test_subprocess.py      |  22 +--
  6 files changed, 89 insertions(+), 62 deletions(-)


diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -22,6 +22,7 @@
 import os
 import socket
 import subprocess
+import threading
 import time
 import traceback
 import sys
@@ -168,7 +169,9 @@
         self._scheduled = []
         self._default_executor = None
         self._internal_fds = 0
-        self._running = False
+        # Identifier of the thread running the event loop, or None if the
+        # event loop is not running
+        self._owner = None
         self._clock_resolution = time.get_clock_info('monotonic').resolution
         self._exception_handler = None
         self._debug = (not sys.flags.ignore_environment
@@ -246,9 +249,9 @@
     def run_forever(self):
         """Run until stop() is called."""
         self._check_closed()
-        if self._running:
+        if self.is_running():
             raise RuntimeError('Event loop is running.')
-        self._running = True
+        self._owner = threading.get_ident()
         try:
             while True:
                 try:
@@ -256,7 +259,7 @@
                 except _StopError:
                     break
         finally:
-            self._running = False
+            self._owner = None
 
     def run_until_complete(self, future):
         """Run until the Future is done.
@@ -311,7 +314,7 @@
 
         The event loop must not be running.
         """
-        if self._running:
+        if self.is_running():
             raise RuntimeError("Cannot close a running event loop")
         if self._closed:
             return
@@ -331,7 +334,7 @@
 
     def is_running(self):
         """Returns True if the event loop is running."""
-        return self._running
+        return (self._owner is not None)
 
     def time(self):
         """Return the time according to the event loop's clock.
@@ -373,7 +376,7 @@
             raise TypeError("coroutines cannot be used with call_at()")
         self._check_closed()
         if self._debug:
-            self._assert_is_current_event_loop()
+            self._check_thread()
         timer = events.TimerHandle(when, callback, args, self)
         if timer._source_traceback:
             del timer._source_traceback[-1]
@@ -391,17 +394,17 @@
         Any positional arguments after the callback will be passed to
         the callback when it is called.
         """
-        handle = self._call_soon(callback, args, check_loop=True)
+        if self._debug:
+            self._check_thread()
+        handle = self._call_soon(callback, args)
         if handle._source_traceback:
             del handle._source_traceback[-1]
         return handle
 
-    def _call_soon(self, callback, args, check_loop):
+    def _call_soon(self, callback, args):
         if (coroutines.iscoroutine(callback)
         or coroutines.iscoroutinefunction(callback)):
             raise TypeError("coroutines cannot be used with call_soon()")
-        if self._debug and check_loop:
-            self._assert_is_current_event_loop()
         self._check_closed()
         handle = events.Handle(callback, args, self)
         if handle._source_traceback:
@@ -409,8 +412,8 @@
         self._ready.append(handle)
         return handle
 
-    def _assert_is_current_event_loop(self):
-        """Asserts that this event loop is the current event loop.
+    def _check_thread(self):
+        """Check that the current thread is the thread running the event loop.
 
         Non-thread-safe methods of this class make this assumption and will
         likely behave incorrectly when the assumption is violated.
@@ -418,18 +421,17 @@
         Should only be called when (self._debug == True).  The caller is
         responsible for checking this condition for performance reasons.
         """
-        try:
-            current = events.get_event_loop()
-        except RuntimeError:
+        if self._owner is None:
             return
-        if current is not self:
+        thread_id = threading.get_ident()
+        if thread_id != self._owner:
             raise RuntimeError(
                 "Non-thread-safe operation invoked on an event loop other "
                 "than the current one")
 
     def call_soon_threadsafe(self, callback, *args):
         """Like call_soon(), but thread-safe."""
-        handle = self._call_soon(callback, args, check_loop=False)
+        handle = self._call_soon(callback, args)
         if handle._source_traceback:
             del handle._source_traceback[-1]
         self._write_to_self()
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -383,7 +383,7 @@
                                            sock, protocol, waiter, extra)
 
     def close(self):
-        if self._running:
+        if self.is_running():
             raise RuntimeError("Cannot close a running event loop")
         if self.is_closed():
             return
@@ -432,9 +432,7 @@
         self._ssock.setblocking(False)
         self._csock.setblocking(False)
         self._internal_fds += 1
-        # don't check the current loop because _make_self_pipe() is called
-        # from the event loop constructor
-        self._call_soon(self._loop_self_reading, (), check_loop=False)
+        self.call_soon(self._loop_self_reading)
 
     def _loop_self_reading(self, f=None):
         try:
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -68,7 +68,7 @@
                                           address, waiter, extra)
 
     def close(self):
-        if self._running:
+        if self.is_running():
             raise RuntimeError("Cannot close a running event loop")
         if self.is_closed():
             return
diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py
--- a/Lib/test/test_asyncio/test_base_events.py
+++ b/Lib/test/test_asyncio/test_base_events.py
@@ -5,6 +5,7 @@
 import math
 import socket
 import sys
+import threading
 import time
 import unittest
 from unittest import mock
@@ -148,28 +149,71 @@
         # are really slow
         self.assertLessEqual(dt, 0.9, dt)
 
-    def test_assert_is_current_event_loop(self):
+    def check_thread(self, loop, debug):
         def cb():
             pass
 
-        other_loop = base_events.BaseEventLoop()
-        other_loop._selector = mock.Mock()
-        asyncio.set_event_loop(other_loop)
+        loop.set_debug(debug)
+        if debug:
+            msg = ("Non-thread-safe operation invoked on an event loop other "
+                  "than the current one")
+            with self.assertRaisesRegex(RuntimeError, msg):
+                loop.call_soon(cb)
+            with self.assertRaisesRegex(RuntimeError, msg):
+                loop.call_later(60, cb)
+            with self.assertRaisesRegex(RuntimeError, msg):
+                loop.call_at(loop.time() + 60, cb)
+        else:
+            loop.call_soon(cb)
+            loop.call_later(60, cb)
+            loop.call_at(loop.time() + 60, cb)
 
-        # raise RuntimeError if the event loop is different in debug mode
-        self.loop.set_debug(True)
-        with self.assertRaises(RuntimeError):
-            self.loop.call_soon(cb)
-        with self.assertRaises(RuntimeError):
-            self.loop.call_later(60, cb)
-        with self.assertRaises(RuntimeError):
-            self.loop.call_at(self.loop.time() + 60, cb)
+    def test_check_thread(self):
+        def check_in_thread(loop, event, debug, create_loop, fut):
+            # wait until the event loop is running
+            event.wait()
+
+            try:
+                if create_loop:
+                    loop2 = base_events.BaseEventLoop()
+                    try:
+                        asyncio.set_event_loop(loop2)
+                        self.check_thread(loop, debug)
+                    finally:
+                        asyncio.set_event_loop(None)
+                        loop2.close()
+                else:
+                    self.check_thread(loop, debug)
+            except Exception as exc:
+                loop.call_soon_threadsafe(fut.set_exception, exc)
+            else:
+                loop.call_soon_threadsafe(fut.set_result, None)
+
+        def test_thread(loop, debug, create_loop=False):
+            event = threading.Event()
+            fut = asyncio.Future(loop=loop)
+            loop.call_soon(event.set)
+            args = (loop, event, debug, create_loop, fut)
+            thread = threading.Thread(target=check_in_thread, args=args)
+            thread.start()
+            loop.run_until_complete(fut)
+            thread.join()
+
+        self.loop._process_events = mock.Mock()
+        self.loop._write_to_self = mock.Mock()
+
+        # raise RuntimeError if the thread has no event loop
+        test_thread(self.loop, True)
 
         # check disabled if debug mode is disabled
-        self.loop.set_debug(False)
-        self.loop.call_soon(cb)
-        self.loop.call_later(60, cb)
-        self.loop.call_at(self.loop.time() + 60, cb)
+        test_thread(self.loop, False)
+
+        # raise RuntimeError if the event loop of the thread is not the called
+        # event loop
+        test_thread(self.loop, True, create_loop=True)
+
+        # check disabled if debug mode is disabled
+        test_thread(self.loop, False, create_loop=True)
 
     def test_run_once_in_executor_handle(self):
         def cb():
diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py
--- a/Lib/test/test_asyncio/test_proactor_events.py
+++ b/Lib/test/test_asyncio/test_proactor_events.py
@@ -440,17 +440,16 @@
         self.loop = EventLoop(self.proactor)
         self.set_event_loop(self.loop, cleanup=False)
 
-    @mock.patch.object(BaseProactorEventLoop, '_call_soon')
+    @mock.patch.object(BaseProactorEventLoop, 'call_soon')
     @mock.patch.object(BaseProactorEventLoop, '_socketpair')
-    def test_ctor(self, socketpair, _call_soon):
+    def test_ctor(self, socketpair, call_soon):
         ssock, csock = socketpair.return_value = (
             mock.Mock(), mock.Mock())
         loop = BaseProactorEventLoop(self.proactor)
         self.assertIs(loop._ssock, ssock)
         self.assertIs(loop._csock, csock)
         self.assertEqual(loop._internal_fds, 1)
-        _call_soon.assert_called_with(loop._loop_self_reading, (),
-                                      check_loop=False)
+        call_soon.assert_called_with(loop._loop_self_reading)
 
     def test_close_self_pipe(self):
         self.loop._close_self_pipe()
diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py
--- a/Lib/test/test_asyncio/test_subprocess.py
+++ b/Lib/test/test_asyncio/test_subprocess.py
@@ -233,19 +233,12 @@
         def setUp(self):
             policy = asyncio.get_event_loop_policy()
             self.loop = policy.new_event_loop()
-
-            # ensure that the event loop is passed explicitly in asyncio
-            policy.set_event_loop(None)
+            self.set_event_loop(self.loop)
 
             watcher = self.Watcher()
             watcher.attach_loop(self.loop)
             policy.set_child_watcher(watcher)
-
-        def tearDown(self):
-            policy = asyncio.get_event_loop_policy()
-            policy.set_child_watcher(None)
-            self.loop.close()
-            super().tearDown()
+            self.addCleanup(policy.set_child_watcher, None)
 
     class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
                                      test_utils.TestCase):
@@ -262,17 +255,8 @@
     class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase):
 
         def setUp(self):
-            policy = asyncio.get_event_loop_policy()
             self.loop = asyncio.ProactorEventLoop()
-
-            # ensure that the event loop is passed explicitly in asyncio
-            policy.set_event_loop(None)
-
-        def tearDown(self):
-            policy = asyncio.get_event_loop_policy()
-            self.loop.close()
-            policy.set_event_loop(None)
-            super().tearDown()
+            self.set_event_loop(self.loop)
 
 
 if __name__ == '__main__':

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list