[Python-checkins] cpython (3.4): Issue #23293, asyncio: Rewrite IocpProactor.connect_pipe()

victor.stinner python-checkins at python.org
Thu Jan 22 22:58:44 CET 2015


https://hg.python.org/cpython/rev/1e3a1af0705f
changeset:   94241:1e3a1af0705f
branch:      3.4
user:        Victor Stinner <victor.stinner at gmail.com>
date:        Thu Jan 22 22:55:08 2015 +0100
summary:
  Issue #23293, asyncio: Rewrite IocpProactor.connect_pipe()

Add _overlapped.ConnectPipe() which tries to connect to the pipe for
asynchronous I/O (overlapped): call CreateFile() in a loop until it doesn't
fail with ERROR_PIPE_BUSY. Use an increasing delay between 1 ms and 100 ms.

Remove Overlapped.WaitNamedPipeAndConnect() which is no more used.

files:
  Lib/asyncio/windows_events.py |   45 +++++---
  Modules/overlapped.c          |  115 ++++-----------------
  2 files changed, 49 insertions(+), 111 deletions(-)


diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -29,6 +29,12 @@
 ERROR_CONNECTION_REFUSED = 1225
 ERROR_CONNECTION_ABORTED = 1236
 
+# Initial delay in seconds for connect_pipe() before retrying to connect
+CONNECT_PIPE_INIT_DELAY = 0.001
+
+# Maximum delay in seconds for connect_pipe() before retrying to connect
+CONNECT_PIPE_MAX_DELAY = 0.100
+
 
 class _OverlappedFuture(futures.Future):
     """Subclass of Future which represents an overlapped operation.
@@ -495,25 +501,28 @@
         return self._register(ov, pipe, finish_accept_pipe,
                               register=False)
 
+    def _connect_pipe(self, fut, address, delay):
+        # Unfortunately there is no way to do an overlapped connect to a pipe.
+        # Call CreateFile() in a loop until it doesn't fail with
+        # ERROR_PIPE_BUSY
+        try:
+            handle = _overlapped.ConnectPipe(address)
+        except OSError as exc:
+            if exc.winerror == _overlapped.ERROR_PIPE_BUSY:
+                # Polling: retry later
+                delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
+                self._loop.call_later(delay,
+                                      self._connect_pipe, fut, address, delay)
+            else:
+                fut.set_exception(exc)
+        else:
+            pipe = windows_utils.PipeHandle(handle)
+            fut.set_result(pipe)
+
     def connect_pipe(self, address):
-        ov = _overlapped.Overlapped(NULL)
-        ov.WaitNamedPipeAndConnect(address, self._iocp, ov.address)
-
-        def finish_connect_pipe(err, handle, ov):
-            # err, handle were arguments passed to PostQueuedCompletionStatus()
-            # in a function run in a thread pool.
-            if err == _overlapped.ERROR_SEM_TIMEOUT:
-                # Connection did not succeed within time limit.
-                msg = _overlapped.FormatMessage(err)
-                raise ConnectionRefusedError(0, msg, None, err)
-            elif err != 0:
-                msg = _overlapped.FormatMessage(err)
-                raise OSError(0, msg, None, err)
-            else:
-                return windows_utils.PipeHandle(handle)
-
-        return self._register(ov, None, finish_connect_pipe,
-                              wait_for_post=True)
+        fut = futures.Future(loop=self._loop)
+        self._connect_pipe(fut, address, CONNECT_PIPE_INIT_DELAY)
+        return fut
 
     def wait_for_handle(self, handle, timeout=None):
         """Wait for a handle.
diff --git a/Modules/overlapped.c b/Modules/overlapped.c
--- a/Modules/overlapped.c
+++ b/Modules/overlapped.c
@@ -52,12 +52,6 @@
     };
 } OverlappedObject;
 
-typedef struct {
-    OVERLAPPED *Overlapped;
-    HANDLE IocpHandle;
-    char Address[1];
-} WaitNamedPipeAndConnectContext;
-
 /*
  * Map Windows error codes to subclasses of OSError
  */
@@ -1133,99 +1127,33 @@
     }
 }
 
-/* Unfortunately there is no way to do an overlapped connect to a
-   pipe.  We instead use WaitNamedPipe() and CreateFile() in a thread
-   pool thread.  If a connection succeeds within a time limit (10
-   seconds) then PostQueuedCompletionStatus() is used to return the
-   pipe handle to the completion port. */
-
-static DWORD WINAPI
-WaitNamedPipeAndConnectInThread(WaitNamedPipeAndConnectContext *ctx)
-{
-    HANDLE PipeHandle = INVALID_HANDLE_VALUE;
-    DWORD Start = GetTickCount();
-    DWORD Deadline = Start + 10*1000;
-    DWORD Error = 0;
-    DWORD Timeout;
-    BOOL Success;
-
-    for ( ; ; ) {
-        Timeout = Deadline - GetTickCount();
-        if ((int)Timeout < 0)
-            break;
-        Success = WaitNamedPipe(ctx->Address, Timeout);
-        Error = Success ? ERROR_SUCCESS : GetLastError();
-        switch (Error) {
-            case ERROR_SUCCESS:
-                PipeHandle = CreateFile(ctx->Address,
-                                        GENERIC_READ | GENERIC_WRITE,
-                                        0, NULL, OPEN_EXISTING,
-                                        FILE_FLAG_OVERLAPPED, NULL);
-                if (PipeHandle == INVALID_HANDLE_VALUE)
-                    continue;
-                break;
-            case ERROR_SEM_TIMEOUT:
-                continue;
-        }
-        break;
-    }
-    if (!PostQueuedCompletionStatus(ctx->IocpHandle, Error,
-                                    (ULONG_PTR)PipeHandle, ctx->Overlapped))
-        CloseHandle(PipeHandle);
-    free(ctx);
-    return 0;
-}
-
 PyDoc_STRVAR(
-    Overlapped_WaitNamedPipeAndConnect_doc,
-    "WaitNamedPipeAndConnect(addr, iocp_handle) -> Overlapped[pipe_handle]\n\n"
-    "Start overlapped connection to address, notifying iocp_handle when\n"
-    "finished");
+    ConnectPipe_doc,
+    "ConnectPipe(addr) -> pipe_handle\n\n"
+    "Connect to the pipe for asynchronous I/O (overlapped).");
 
 static PyObject *
-Overlapped_WaitNamedPipeAndConnect(OverlappedObject *self, PyObject *args)
+ConnectPipe(OverlappedObject *self, PyObject *args)
 {
-    char *Address;
-    Py_ssize_t AddressLength;
-    HANDLE IocpHandle;
-    OVERLAPPED Overlapped;
-    BOOL ret;
-    DWORD err;
-    WaitNamedPipeAndConnectContext *ctx;
-    Py_ssize_t ContextLength;
+    PyObject *AddressObj;
+    wchar_t *Address;
+    HANDLE PipeHandle;
 
-    if (!PyArg_ParseTuple(args, "s#" F_HANDLE F_POINTER,
-                          &Address, &AddressLength, &IocpHandle, &Overlapped))
+    if (!PyArg_ParseTuple(args, "U",  &AddressObj))
         return NULL;
 
-    if (self->type != TYPE_NONE) {
-        PyErr_SetString(PyExc_ValueError, "operation already attempted");
+    Address = PyUnicode_AsWideCharString(AddressObj, NULL);
+    if (Address == NULL)
         return NULL;
-    }
 
-    ContextLength = (AddressLength +
-                     offsetof(WaitNamedPipeAndConnectContext, Address));
-    ctx = calloc(1, ContextLength + 1);
-    if (ctx == NULL)
-        return PyErr_NoMemory();
-    memcpy(ctx->Address, Address, AddressLength + 1);
-    ctx->Overlapped = &self->overlapped;
-    ctx->IocpHandle = IocpHandle;
-
-    self->type = TYPE_WAIT_NAMED_PIPE_AND_CONNECT;
-    self->handle = NULL;
-
-    Py_BEGIN_ALLOW_THREADS
-    ret = QueueUserWorkItem(WaitNamedPipeAndConnectInThread, ctx,
-                            WT_EXECUTELONGFUNCTION);
-    Py_END_ALLOW_THREADS
-
-    mark_as_completed(&self->overlapped);
-
-    self->error = err = ret ? ERROR_SUCCESS : GetLastError();
-    if (!ret)
-        return SetFromWindowsErr(err);
-    Py_RETURN_NONE;
+    PipeHandle = CreateFileW(Address,
+                             GENERIC_READ | GENERIC_WRITE,
+                             0, NULL, OPEN_EXISTING,
+                             FILE_FLAG_OVERLAPPED, NULL);
+    PyMem_Free(Address);
+    if (PipeHandle == INVALID_HANDLE_VALUE)
+        return SetFromWindowsErr(0);
+    return Py_BuildValue(F_HANDLE, PipeHandle);
 }
 
 static PyObject*
@@ -1262,9 +1190,6 @@
      METH_VARARGS, Overlapped_DisconnectEx_doc},
     {"ConnectNamedPipe", (PyCFunction) Overlapped_ConnectNamedPipe,
      METH_VARARGS, Overlapped_ConnectNamedPipe_doc},
-    {"WaitNamedPipeAndConnect",
-     (PyCFunction) Overlapped_WaitNamedPipeAndConnect,
-     METH_VARARGS, Overlapped_WaitNamedPipeAndConnect_doc},
     {NULL}
 };
 
@@ -1350,6 +1275,9 @@
      METH_VARARGS, SetEvent_doc},
     {"ResetEvent", overlapped_ResetEvent,
      METH_VARARGS, ResetEvent_doc},
+    {"ConnectPipe",
+     (PyCFunction) ConnectPipe,
+     METH_VARARGS, ConnectPipe_doc},
     {NULL}
 };
 
@@ -1394,6 +1322,7 @@
     WINAPI_CONSTANT(F_DWORD,  ERROR_IO_PENDING);
     WINAPI_CONSTANT(F_DWORD,  ERROR_NETNAME_DELETED);
     WINAPI_CONSTANT(F_DWORD,  ERROR_SEM_TIMEOUT);
+    WINAPI_CONSTANT(F_DWORD,  ERROR_PIPE_BUSY);
     WINAPI_CONSTANT(F_DWORD,  INFINITE);
     WINAPI_CONSTANT(F_HANDLE, INVALID_HANDLE_VALUE);
     WINAPI_CONSTANT(F_HANDLE, NULL);

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list