[Python-checkins] bpo-35493: Use Process.sentinel instead of sleeping for polling worker status in multiprocessing.Pool (#11488)

Pablo Galindo webhook-mailer at python.org
Sat Mar 16 18:34:27 EDT 2019


https://github.com/python/cpython/commit/7c994549dcffd0d9d3bb37475e6374f356e7240e
commit: 7c994549dcffd0d9d3bb37475e6374f356e7240e
branch: master
author: Pablo Galindo <Pablogsal at gmail.com>
committer: GitHub <noreply at github.com>
date: 2019-03-16T22:34:24Z
summary:

bpo-35493: Use Process.sentinel instead of sleeping for polling worker status in multiprocessing.Pool (#11488)

* bpo-35493: Use Process.sentinel instead of sleeping for polling worker status in multiprocessing.Pool

* Use self-pipe pattern to avoid polling for changes

* Refactor some variable names and add comments

* Restore timeout and poll

* Use reader object only on wait()

* Recompute worker sentinels every time

* Remove timeout and use change notifier

* Refactor some methods to be overloaded by the ThreadPool, document the cache class and fix typos

files:
A Misc/NEWS.d/next/Library/2019-01-09-23-43-08.bpo-35493.kEcRGE.rst
M Lib/multiprocessing/pool.py

diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index 18a56f8524b4..665ca067fa07 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -21,11 +21,13 @@
 import time
 import traceback
 import warnings
+from queue import Empty
 
 # If threading is available then ThreadPool should be provided.  Therefore
 # we avoid top-level imports which are liable to fail on some systems.
 from . import util
 from . import get_context, TimeoutError
+from .connection import wait
 
 #
 # Constants representing the state of a pool
@@ -145,6 +147,29 @@ def _helper_reraises_exception(ex):
 # Class representing a process pool
 #
 
+class _PoolCache(dict):
+    """
+    Class that implements a cache for the Pool class that will notify
+    the pool management threads every time the cache is emptied. The
+    notification is done by the use of a queue that is provided when
+    instantiating the cache.
+    """
+    def __init__(self, *args, notifier=None, **kwds):
+        self.notifier = notifier
+        super().__init__(*args, **kwds)
+
+    def __delitem__(self, item):
+        super().__delitem__(item)
+
+        # Notify that the cache is empty. This is important because the
+        # pool keeps maintaining workers until the cache gets drained. This
+        # eliminates a race condition in which a task is finished after the
+        # the pool's _handle_workers method has enter another iteration of the
+        # loop. In this situation, the only event that can wake up the pool
+        # is the cache to be emptied (no more tasks available).
+        if not self:
+            self.notifier.put(None)
+
 class Pool(object):
     '''
     Class which supports an async version of applying functions to arguments.
@@ -165,7 +190,11 @@ def __init__(self, processes=None, initializer=None, initargs=(),
         self._ctx = context or get_context()
         self._setup_queues()
         self._taskqueue = queue.SimpleQueue()
-        self._cache = {}
+        # The _change_notifier queue exist to wake up self._handle_workers()
+        # when the cache (self._cache) is empty or when there is a change in
+        # the _state variable of the thread that runs _handle_workers.
+        self._change_notifier = self._ctx.SimpleQueue()
+        self._cache = _PoolCache(notifier=self._change_notifier)
         self._maxtasksperchild = maxtasksperchild
         self._initializer = initializer
         self._initargs = initargs
@@ -189,12 +218,14 @@ def __init__(self, processes=None, initializer=None, initargs=(),
                 p.join()
             raise
 
+        sentinels = self._get_sentinels()
+
         self._worker_handler = threading.Thread(
             target=Pool._handle_workers,
             args=(self._cache, self._taskqueue, self._ctx, self.Process,
                   self._processes, self._pool, self._inqueue, self._outqueue,
                   self._initializer, self._initargs, self._maxtasksperchild,
-                  self._wrap_exception)
+                  self._wrap_exception, sentinels, self._change_notifier)
             )
         self._worker_handler.daemon = True
         self._worker_handler._state = RUN
@@ -221,7 +252,7 @@ def __init__(self, processes=None, initializer=None, initargs=(),
         self._terminate = util.Finalize(
             self, self._terminate_pool,
             args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
-                  self._worker_handler, self._task_handler,
+                  self._change_notifier, self._worker_handler, self._task_handler,
                   self._result_handler, self._cache),
             exitpriority=15
             )
@@ -233,6 +264,8 @@ def __del__(self, _warn=warnings.warn, RUN=RUN):
         if self._state == RUN:
             _warn(f"unclosed running multiprocessing pool {self!r}",
                   ResourceWarning, source=self)
+            if getattr(self, '_change_notifier', None) is not None:
+                self._change_notifier.put(None)
 
     def __repr__(self):
         cls = self.__class__
@@ -240,6 +273,16 @@ def __repr__(self):
                 f'state={self._state} '
                 f'pool_size={len(self._pool)}>')
 
+    def _get_sentinels(self):
+        task_queue_sentinels = [self._outqueue._reader]
+        self_notifier_sentinels = [self._change_notifier._reader]
+        return [*task_queue_sentinels, *self_notifier_sentinels]
+
+    @staticmethod
+    def _get_worker_sentinels(workers):
+        return [worker.sentinel for worker in
+                workers if hasattr(worker, "sentinel")]
+
     @staticmethod
     def _join_exited_workers(pool):
         """Cleanup after any worker processes which have exited due to reaching
@@ -452,18 +495,28 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
         return result
 
     @staticmethod
-    def _handle_workers(cache, taskqueue, ctx, Process, processes, pool,
-                        inqueue, outqueue, initializer, initargs,
-                        maxtasksperchild, wrap_exception):
+    def _wait_for_updates(sentinels, change_notifier, timeout=None):
+        wait(sentinels, timeout=timeout)
+        while not change_notifier.empty():
+            change_notifier.get()
+
+    @classmethod
+    def _handle_workers(cls, cache, taskqueue, ctx, Process, processes,
+                        pool, inqueue, outqueue, initializer, initargs,
+                        maxtasksperchild, wrap_exception, sentinels,
+                        change_notifier):
         thread = threading.current_thread()
 
         # Keep maintaining workers until the cache gets drained, unless the pool
         # is terminated.
         while thread._state == RUN or (cache and thread._state != TERMINATE):
-            Pool._maintain_pool(ctx, Process, processes, pool, inqueue,
-                                outqueue, initializer, initargs,
-                                maxtasksperchild, wrap_exception)
-            time.sleep(0.1)
+            cls._maintain_pool(ctx, Process, processes, pool, inqueue,
+                               outqueue, initializer, initargs,
+                               maxtasksperchild, wrap_exception)
+
+            current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels]
+
+            cls._wait_for_updates(current_sentinels, change_notifier)
         # send sentinel to stop workers
         taskqueue.put(None)
         util.debug('worker handler exiting')
@@ -593,11 +646,13 @@ def close(self):
         if self._state == RUN:
             self._state = CLOSE
             self._worker_handler._state = CLOSE
+            self._change_notifier.put(None)
 
     def terminate(self):
         util.debug('terminating pool')
         self._state = TERMINATE
         self._worker_handler._state = TERMINATE
+        self._change_notifier.put(None)
         self._terminate()
 
     def join(self):
@@ -622,7 +677,7 @@ def _help_stuff_finish(inqueue, task_handler, size):
             time.sleep(0)
 
     @classmethod
-    def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
+    def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
                         worker_handler, task_handler, result_handler, cache):
         # this is guaranteed to only be called once
         util.debug('finalizing pool')
@@ -638,6 +693,7 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
                 "Cannot have cache with result_hander not alive")
 
         result_handler._state = TERMINATE
+        change_notifier.put(None)
         outqueue.put(None)                  # sentinel
 
         # We must wait for the worker handler to exit before terminating
@@ -871,6 +927,13 @@ def _setup_queues(self):
         self._quick_put = self._inqueue.put
         self._quick_get = self._outqueue.get
 
+    def _get_sentinels(self):
+        return [self._change_notifier._reader]
+
+    @staticmethod
+    def _get_worker_sentinels(workers):
+        return []
+
     @staticmethod
     def _help_stuff_finish(inqueue, task_handler, size):
         # drain inqueue, and put sentinels at its head to make workers finish
@@ -881,3 +944,6 @@ def _help_stuff_finish(inqueue, task_handler, size):
             pass
         for i in range(size):
             inqueue.put(None)
+
+    def _wait_for_updates(self, sentinels, change_notifier, timeout):
+        time.sleep(timeout)
diff --git a/Misc/NEWS.d/next/Library/2019-01-09-23-43-08.bpo-35493.kEcRGE.rst b/Misc/NEWS.d/next/Library/2019-01-09-23-43-08.bpo-35493.kEcRGE.rst
new file mode 100644
index 000000000000..fa408c8163b7
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2019-01-09-23-43-08.bpo-35493.kEcRGE.rst
@@ -0,0 +1,3 @@
+Use :func:`multiprocessing.connection.wait` instead of polling each 0.2
+seconds for worker updates in :class:`multiprocessing.Pool`. Patch by Pablo
+Galindo.



More information about the Python-checkins mailing list