[Python-checkins] cpython: Issue #12456: fix a possible hang on shutdown of a

antoine.pitrou python-checkins at python.org
Sat Jul 2 21:21:14 CEST 2011


http://hg.python.org/cpython/rev/51c1f2cedb96
changeset:   71137:51c1f2cedb96
user:        Antoine Pitrou <solipsis at pitrou.net>
date:        Sat Jul 02 21:20:25 2011 +0200
summary:
  Issue #12456: fix a possible hang on shutdown of a concurrent.futures.ProcessPoolExecutor.

files:
  Lib/concurrent/futures/process.py   |  30 +++++++++++-----
  Lib/test/test_concurrent_futures.py |   7 +++
  2 files changed, 28 insertions(+), 9 deletions(-)


diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -50,7 +50,7 @@
 from concurrent.futures import _base
 import queue
 import multiprocessing
-from multiprocessing.queues import SimpleQueue, SentinelReady
+from multiprocessing.queues import SimpleQueue, SentinelReady, Full
 import threading
 import weakref
 
@@ -195,6 +195,10 @@
         result_queue: A multiprocessing.Queue of _ResultItems generated by the
             process workers.
     """
+    executor = None
+
+    def shutting_down():
+        return _shutdown or executor is None or executor._shutdown_thread
 
     def shutdown_worker():
         # This is an upper bound
@@ -202,8 +206,7 @@
         for i in range(0, nb_children_alive):
             call_queue.put(None)
         # If .join() is not called on the created processes then
-        # some multiprocessing.Queue methods may deadlock on Mac OS
-        # X.
+        # some multiprocessing.Queue methods may deadlock on Mac OS X.
         for p in processes.values():
             p.join()
 
@@ -222,7 +225,7 @@
             if executor is not None:
                 executor._broken = True
                 executor._shutdown_thread = True
-                del executor
+                executor = None
             # All futures in flight must be marked failed
             for work_id, work_item in pending_work_items.items():
                 work_item.future.set_exception(
@@ -242,7 +245,11 @@
         if isinstance(result_item, int):
             # Clean shutdown of a worker using its PID
             # (avoids marking the executor broken)
+            assert shutting_down()
             del processes[result_item]
+            if not processes:
+                shutdown_worker()
+                return
         elif result_item is not None:
             work_item = pending_work_items.pop(result_item.work_id, None)
             # work_item can be None if another process terminated (see above)
@@ -257,16 +264,21 @@
         #   - The interpreter is shutting down OR
         #   - The executor that owns this worker has been collected OR
         #   - The executor that owns this worker has been shutdown.
-        if _shutdown or executor is None or executor._shutdown_thread:
+        if shutting_down():
             # Since no new work items can be added, it is safe to shutdown
             # this thread if there are no pending work items.
-            if not pending_work_items:
+            if not pending_work_items and call_queue.qsize() == 0:
                 shutdown_worker()
                 return
-            else:
+            try:
                 # Start shutting down by telling a process it can exit.
-                call_queue.put(None)
-        del executor
+                call_queue.put_nowait(None)
+            except Full:
+                # This is not a problem: we will eventually be woken up (in
+                # result_queue.get()) and be able to send a sentinel again,
+                # if necessary.
+                pass
+        executor = None
 
 _system_limits_checked = False
 _system_limited = None
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -367,6 +367,13 @@
 
         self.assertEqual([None, None], results)
 
+    def test_shutdown_race_issue12456(self):
+        # Issue #12456: race condition at shutdown where trying to post a
+        # sentinel in the call queue blocks (the queue is full while processes
+        # have exited).
+        self.executor.map(str, [2] * (self.worker_count + 1))
+        self.executor.shutdown()
+
 
 class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
     def test_map_submits_without_iteration(self):

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list