[Python-checkins] cpython (3.4): asyncio: sync with Tulip

victor.stinner python-checkins at python.org
Fri Jul 25 00:58:09 CEST 2014


http://hg.python.org/cpython/rev/ea1057bc60ab
changeset:   91840:ea1057bc60ab
branch:      3.4
parent:      91836:3f08c1156050
user:        Victor Stinner <victor.stinner at gmail.com>
date:        Fri Jul 25 00:54:53 2014 +0200
summary:
  asyncio: sync with Tulip

Improve stability of the proactor event loop, especially operations on
overlapped objects:

* Tulip issue 195: Don't call UnregisterWait() twice if a _WaitHandleFuture is
  cancelled twice to fix a crash.
* IocpProactor.close(): cancel futures to cancel overlapped operations, instead
  of cancelling directly overlapped operations. Future objects may not call
  ov.cancel() if the future was cancelled or if the overlapped was already
  cancelled. The cancel() method of the future may also catch exceptions. Log
  also errors on cancellation.
* tests: rename "f" to "fut"
* Add a __repr__() method to IocpProactor
* Add a destructor to IocpProactor which closes it
* _OverlappedFuture.cancel() doesn't cancel the overlapped anymore if it is
  done: if it is already cancelled or completed. Log also an error if the
  cancellation failed.
* Add the address of the overlapped object in repr(_OverlappedFuture)
* _OverlappedFuture truncates the source traceback to hide the call to the
  parent constructor (useless in debug).

files:
  Lib/asyncio/windows_events.py                |  66 ++++++---
  Lib/test/test_asyncio/test_windows_events.py |  30 +++-
  2 files changed, 66 insertions(+), 30 deletions(-)


diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -38,14 +38,14 @@
 
     def __init__(self, ov, *, loop=None):
         super().__init__(loop=loop)
+        if self._source_traceback:
+            del self._source_traceback[-1]
         self.ov = ov
 
     def __repr__(self):
         info = [self._state.lower()]
-        if self.ov.pending:
-            info.append('overlapped=pending')
-        else:
-            info.append('overlapped=completed')
+        state = 'pending' if self.ov.pending else 'completed'
+        info.append('overlapped=<%s, %#x>' % (state, self.ov.address))
         if self._state == futures._FINISHED:
             info.append(self._format_result())
         if self._callbacks:
@@ -53,10 +53,18 @@
         return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
 
     def cancel(self):
-        try:
-            self.ov.cancel()
-        except OSError:
-            pass
+        if not self.done():
+            try:
+                self.ov.cancel()
+            except OSError as exc:
+                context = {
+                    'message': 'Cancelling an overlapped future failed',
+                    'exception': exc,
+                    'future': self,
+                }
+                if self._source_traceback:
+                    context['source_traceback'] = self._source_traceback
+                self._loop.call_exception_handler(context)
         return super().cancel()
 
 
@@ -67,13 +75,20 @@
         super().__init__(loop=loop)
         self._wait_handle = wait_handle
 
-    def cancel(self):
-        super().cancel()
+    def _unregister(self):
+        if self._wait_handle is None:
+            return
         try:
             _overlapped.UnregisterWait(self._wait_handle)
         except OSError as e:
             if e.winerror != _overlapped.ERROR_IO_PENDING:
                 raise
+            # ERROR_IO_PENDING is not an error, the wait was unregistered
+        self._wait_handle = None
+
+    def cancel(self):
+        self._unregister()
+        super().cancel()
 
 
 class PipeServer(object):
@@ -208,6 +223,11 @@
         self._registered = weakref.WeakSet()
         self._stopped_serving = weakref.WeakSet()
 
+    def __repr__(self):
+        return ('<%s overlapped#=%s result#=%s>'
+                % (self.__class__.__name__, len(self._cache),
+                   len(self._results)))
+
     def set_loop(self, loop):
         self._loop = loop
 
@@ -353,12 +373,7 @@
         f = _WaitHandleFuture(wh, loop=self._loop)
 
         def finish_wait_for_handle(trans, key, ov):
-            if not f.cancelled():
-                try:
-                    _overlapped.UnregisterWait(wh)
-                except OSError as e:
-                    if e.winerror != _overlapped.ERROR_IO_PENDING:
-                        raise
+            f._unregister()
             # Note that this second wait means that we should only use
             # this with handles types where a successful wait has no
             # effect.  So events or processes are all right, but locks
@@ -455,7 +470,7 @@
 
     def close(self):
         # Cancel remaining registered operations.
-        for address, (f, ov, obj, callback) in list(self._cache.items()):
+        for address, (fut, ov, obj, callback) in list(self._cache.items()):
             if obj is None:
                 # The operation was started with connect_pipe() which
                 # queues a task to Windows' thread pool.  This cannot
@@ -463,9 +478,17 @@
                 del self._cache[address]
             else:
                 try:
-                    ov.cancel()
-                except OSError:
-                    pass
+                    fut.cancel()
+                except OSError as exc:
+                    if self._loop is not None:
+                        context = {
+                            'message': 'Cancelling a future failed',
+                            'exception': exc,
+                            'future': fut,
+                        }
+                        if fut._source_traceback:
+                            context['source_traceback'] = fut._source_traceback
+                        self._loop.call_exception_handler(context)
 
         while self._cache:
             if not self._poll(1):
@@ -476,6 +499,9 @@
             _winapi.CloseHandle(self._iocp)
             self._iocp = None
 
+    def __del__(self):
+        self.close()
+
 
 class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
 
diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py
--- a/Lib/test/test_asyncio/test_windows_events.py
+++ b/Lib/test/test_asyncio/test_windows_events.py
@@ -96,36 +96,46 @@
 
         # Wait for unset event with 0.5s timeout;
         # result should be False at timeout
-        f = self.loop._proactor.wait_for_handle(event, 0.5)
+        fut = self.loop._proactor.wait_for_handle(event, 0.5)
         start = self.loop.time()
-        self.loop.run_until_complete(f)
+        self.loop.run_until_complete(fut)
         elapsed = self.loop.time() - start
-        self.assertFalse(f.result())
+        self.assertFalse(fut.result())
         self.assertTrue(0.48 < elapsed < 0.9, elapsed)
 
         _overlapped.SetEvent(event)
 
         # Wait for for set event;
         # result should be True immediately
-        f = self.loop._proactor.wait_for_handle(event, 10)
+        fut = self.loop._proactor.wait_for_handle(event, 10)
         start = self.loop.time()
-        self.loop.run_until_complete(f)
+        self.loop.run_until_complete(fut)
         elapsed = self.loop.time() - start
-        self.assertTrue(f.result())
+        self.assertTrue(fut.result())
         self.assertTrue(0 <= elapsed < 0.3, elapsed)
 
-        _overlapped.ResetEvent(event)
+        # Tulip issue #195: cancelling a done _WaitHandleFuture must not crash
+        fut.cancel()
+
+    def test_wait_for_handle_cancel(self):
+        event = _overlapped.CreateEvent(None, True, False, None)
+        self.addCleanup(_winapi.CloseHandle, event)
 
         # Wait for unset event with a cancelled future;
         # CancelledError should be raised immediately
-        f = self.loop._proactor.wait_for_handle(event, 10)
-        f.cancel()
+        fut = self.loop._proactor.wait_for_handle(event, 10)
+        fut.cancel()
         start = self.loop.time()
         with self.assertRaises(asyncio.CancelledError):
-            self.loop.run_until_complete(f)
+            self.loop.run_until_complete(fut)
         elapsed = self.loop.time() - start
         self.assertTrue(0 <= elapsed < 0.1, elapsed)
 
+        # Tulip issue #195: cancelling a _WaitHandleFuture twice must not crash
+        fut = self.loop._proactor.wait_for_handle(event)
+        fut.cancel()
+        fut.cancel()
+
 
 if __name__ == '__main__':
     unittest.main()

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list