[Python-checkins] cpython: asyncio: Fix get_event_loop() to call set_event_loop() when setting the loop.

guido.van.rossum python-checkins at python.org
Wed Nov 27 19:37:21 CET 2013


http://hg.python.org/cpython/rev/5c9af8194d3b
changeset:   87614:5c9af8194d3b
user:        Guido van Rossum <guido at python.org>
date:        Wed Nov 27 10:37:13 2013 -0800
summary:
  asyncio: Fix get_event_loop() to call set_event_loop() when setting the loop. By Anthony Baire.

files:
  Lib/asyncio/events.py                |   2 +-
  Lib/test/test_asyncio/test_events.py |  16 ++++++++++++++++
  2 files changed, 17 insertions(+), 1 deletions(-)


diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -360,7 +360,7 @@
         if (self._local._loop is None and
             not self._local._set_called and
             isinstance(threading.current_thread(), threading._MainThread)):
-            self._local._loop = self.new_event_loop()
+            self.set_event_loop(self.new_event_loop())
         assert self._local._loop is not None, \
                ('There is no current event loop in thread %r.' %
                 threading.current_thread().name)
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -1599,6 +1599,22 @@
         self.assertIs(loop, policy.get_event_loop())
         loop.close()
 
+    def test_get_event_loop_calls_set_event_loop(self):
+        policy = self.create_policy()
+
+        with unittest.mock.patch.object(
+                policy, "set_event_loop",
+                wraps=policy.set_event_loop) as m_set_event_loop:
+
+            loop = policy.get_event_loop()
+
+            # policy._local._loop must be set through .set_event_loop()
+            # (the unix DefaultEventLoopPolicy needs this call to attach
+            # the child watcher correctly)
+            m_set_event_loop.assert_called_with(loop)
+
+        loop.close()
+
     def test_get_event_loop_after_set_none(self):
         policy = self.create_policy()
         policy.set_event_loop(None)

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list