[Python-checkins] cpython: Issue #18934: multiprocessing: use selectors module.
charles-francois.natali
python-checkins at python.org
Thu Sep 5 20:47:12 CEST 2013
http://hg.python.org/cpython/rev/81f0c6358a5f
changeset: 85551:81f0c6358a5f
parent: 85549:c41c68a18bb6
user: Charles-François Natali <cf.natali at gmail.com>
date: Thu Sep 05 20:46:49 2013 +0200
summary:
Issue #18934: multiprocessing: use selectors module.
files:
Lib/multiprocessing/connection.py | 51 +++++-------------
Lib/multiprocessing/forkserver.py | 14 +++-
2 files changed, 26 insertions(+), 39 deletions(-)
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -12,7 +12,6 @@
import io
import os
import sys
-import select
import socket
import struct
import errno
@@ -877,28 +876,7 @@
else:
- if hasattr(select, 'poll'):
- def _poll(fds, timeout):
- if timeout is not None:
- timeout = int(timeout * 1000) # timeout is in milliseconds
- fd_map = {}
- pollster = select.poll()
- for fd in fds:
- pollster.register(fd, select.POLLIN)
- if hasattr(fd, 'fileno'):
- fd_map[fd.fileno()] = fd
- else:
- fd_map[fd] = fd
- ls = []
- for fd, event in pollster.poll(timeout):
- if event & select.POLLNVAL:
- raise ValueError('invalid file descriptor %i' % fd)
- ls.append(fd_map[fd])
- return ls
- else:
- def _poll(fds, timeout):
- return select.select(fds, [], [], timeout)[0]
-
+ import selectors
def wait(object_list, timeout=None):
'''
@@ -906,19 +884,22 @@
Returns list of those objects in object_list which are ready/readable.
'''
- if timeout is not None:
- if timeout <= 0:
- return _poll(object_list, 0)
- else:
+ with selectors.DefaultSelector() as selector:
+ for obj in object_list:
+ selector.register(obj, selectors.EVENT_READ)
+
+ if timeout is not None:
deadline = time.time() + timeout
- while True:
- try:
- return _poll(object_list, timeout)
- except OSError as e:
- if e.errno != errno.EINTR:
- raise
- if timeout is not None:
- timeout = deadline - time.time()
+
+ while True:
+ ready = selector.select(timeout)
+ if ready:
+ return [key.fileobj for (key, events) in ready]
+ else:
+ if timeout is not None:
+ timeout = deadline - time.time()
+ if timeout < 0:
+ return ready
#
# Make connection and socket objects sharable if possible
diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py
--- a/Lib/multiprocessing/forkserver.py
+++ b/Lib/multiprocessing/forkserver.py
@@ -1,6 +1,6 @@
import errno
import os
-import select
+import selectors
import signal
import socket
import struct
@@ -149,14 +149,20 @@
# ignoring SIGCHLD means no need to reap zombie processes
handler = signal.signal(signal.SIGCHLD, signal.SIG_IGN)
- with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener:
+ with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \
+ selectors.DefaultSelector() as selector:
global _forkserver_address
_forkserver_address = listener.getsockname()
- readers = [listener, alive_r]
+
+ selector.register(listener, selectors.EVENT_READ)
+ selector.register(alive_r, selectors.EVENT_READ)
while True:
try:
- rfds, wfds, xfds = select.select(readers, [], [])
+ while True:
+ rfds = [key.fileobj for (key, events) in selector.select()]
+ if rfds:
+ break
if alive_r in rfds:
# EOF because no more client processes left
--
Repository URL: http://hg.python.org/cpython
More information about the Python-checkins
mailing list