[Python-checkins] cpython: Issue #18934: multiprocessing: use selectors module.

charles-francois.natali python-checkins at python.org
Thu Sep 5 20:47:12 CEST 2013


http://hg.python.org/cpython/rev/81f0c6358a5f
changeset:   85551:81f0c6358a5f
parent:      85549:c41c68a18bb6
user:        Charles-François Natali <cf.natali at gmail.com>
date:        Thu Sep 05 20:46:49 2013 +0200
summary:
  Issue #18934: multiprocessing: use selectors module.

files:
  Lib/multiprocessing/connection.py |  51 +++++-------------
  Lib/multiprocessing/forkserver.py |  14 +++-
  2 files changed, 26 insertions(+), 39 deletions(-)


diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -12,7 +12,6 @@
 import io
 import os
 import sys
-import select
 import socket
 import struct
 import errno
@@ -877,28 +876,7 @@
 
 else:
 
-    if hasattr(select, 'poll'):
-        def _poll(fds, timeout):
-            if timeout is not None:
-                timeout = int(timeout * 1000)  # timeout is in milliseconds
-            fd_map = {}
-            pollster = select.poll()
-            for fd in fds:
-                pollster.register(fd, select.POLLIN)
-                if hasattr(fd, 'fileno'):
-                    fd_map[fd.fileno()] = fd
-                else:
-                    fd_map[fd] = fd
-            ls = []
-            for fd, event in pollster.poll(timeout):
-                if event & select.POLLNVAL:
-                    raise ValueError('invalid file descriptor %i' % fd)
-                ls.append(fd_map[fd])
-            return ls
-    else:
-        def _poll(fds, timeout):
-            return select.select(fds, [], [], timeout)[0]
-
+    import selectors
 
     def wait(object_list, timeout=None):
         '''
@@ -906,19 +884,22 @@
 
         Returns list of those objects in object_list which are ready/readable.
         '''
-        if timeout is not None:
-            if timeout <= 0:
-                return _poll(object_list, 0)
-            else:
+        with selectors.DefaultSelector() as selector:
+            for obj in object_list:
+                selector.register(obj, selectors.EVENT_READ)
+
+            if timeout is not None:
                 deadline = time.time() + timeout
-        while True:
-            try:
-                return _poll(object_list, timeout)
-            except OSError as e:
-                if e.errno != errno.EINTR:
-                    raise
-            if timeout is not None:
-                timeout = deadline - time.time()
+
+            while True:
+                ready = selector.select(timeout)
+                if ready:
+                    return [key.fileobj for (key, events) in ready]
+                else:
+                    if timeout is not None:
+                        timeout = deadline - time.time()
+                        if timeout < 0:
+                            return ready
 
 #
 # Make connection and socket objects sharable if possible
diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py
--- a/Lib/multiprocessing/forkserver.py
+++ b/Lib/multiprocessing/forkserver.py
@@ -1,6 +1,6 @@
 import errno
 import os
-import select
+import selectors
 import signal
 import socket
 import struct
@@ -149,14 +149,20 @@
 
     # ignoring SIGCHLD means no need to reap zombie processes
     handler = signal.signal(signal.SIGCHLD, signal.SIG_IGN)
-    with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener:
+    with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \
+         selectors.DefaultSelector() as selector:
         global _forkserver_address
         _forkserver_address = listener.getsockname()
-        readers = [listener, alive_r]
+
+        selector.register(listener, selectors.EVENT_READ)
+        selector.register(alive_r, selectors.EVENT_READ)
 
         while True:
             try:
-                rfds, wfds, xfds = select.select(readers, [], [])
+                while True:
+                    rfds = [key.fileobj for (key, events) in selector.select()]
+                    if rfds:
+                        break
 
                 if alive_r in rfds:
                     # EOF because no more client processes left

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list