[Python-checkins] bpo-31310: multiprocessing's semaphore tracker should be launched again if crashed (#3247)

Antoine Pitrou webhook-mailer at python.org
Fri Nov 3 09:31:41 EDT 2017


https://github.com/python/cpython/commit/cbe1756e3ecefc0e24a5d0a4b8663db9b6d0cc52
commit: cbe1756e3ecefc0e24a5d0a4b8663db9b6d0cc52
branch: master
author: Antoine Pitrou <pitrou at free.fr>
committer: GitHub <noreply at github.com>
date: 2017-11-03T14:31:38+01:00
summary:

bpo-31310: multiprocessing's semaphore tracker should be launched again if crashed (#3247)

* bpo-31310: multiprocessing's semaphore tracker should be launched again if crashed

* Avoid mucking with process state in test.
Add a warning if the semaphore process died, as semaphores may then be leaked.

* Add NEWS entry

files:
A Misc/NEWS.d/next/Library/2017-08-30-18-23-54.bpo-31310.7D1UNt.rst
M Lib/multiprocessing/semaphore_tracker.py
M Lib/test/_test_multiprocessing.py

diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py
index d5f259c246b..3e31bf8402e 100644
--- a/Lib/multiprocessing/semaphore_tracker.py
+++ b/Lib/multiprocessing/semaphore_tracker.py
@@ -29,6 +29,7 @@ class SemaphoreTracker(object):
     def __init__(self):
         self._lock = threading.Lock()
         self._fd = None
+        self._pid = None
 
     def getfd(self):
         self.ensure_running()
@@ -40,8 +41,20 @@ def ensure_running(self):
         This can be run from any process.  Usually a child process will use
         the semaphore created by its parent.'''
         with self._lock:
-            if self._fd is not None:
-                return
+            if self._pid is not None:
+                # semaphore tracker was launched before, is it still running?
+                pid, status = os.waitpid(self._pid, os.WNOHANG)
+                if not pid:
+                    # => still alive
+                    return
+                # => dead, launch it again
+                os.close(self._fd)
+                self._fd = None
+                self._pid = None
+
+                warnings.warn('semaphore_tracker: process died unexpectedly, '
+                              'relaunching.  Some semaphores might leak.')
+
             fds_to_pass = []
             try:
                 fds_to_pass.append(sys.stderr.fileno())
@@ -55,12 +68,13 @@ def ensure_running(self):
                 exe = spawn.get_executable()
                 args = [exe] + util._args_from_interpreter_flags()
                 args += ['-c', cmd % r]
-                util.spawnv_passfds(exe, args, fds_to_pass)
+                pid = util.spawnv_passfds(exe, args, fds_to_pass)
             except:
                 os.close(w)
                 raise
             else:
                 self._fd = w
+                self._pid = pid
             finally:
                 os.close(r)
 
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 799146d8a3f..d4e8a8a7e1a 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -4,6 +4,7 @@
 
 import unittest
 import queue as pyqueue
+import contextlib
 import time
 import io
 import itertools
@@ -4344,14 +4345,14 @@ def test_preload_resources(self):
             self.fail("failed spawning forkserver or grandchild")
 
 
-#
-# Check that killing process does not leak named semaphores
-#
-
 @unittest.skipIf(sys.platform == "win32",
                  "test semantics don't make sense on Windows")
 class TestSemaphoreTracker(unittest.TestCase):
+
     def test_semaphore_tracker(self):
+        #
+        # Check that killing process does not leak named semaphores
+        #
         import subprocess
         cmd = '''if 1:
             import multiprocessing as mp, time, os
@@ -4385,6 +4386,40 @@ def test_semaphore_tracker(self):
         self.assertRegex(err, expected)
         self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1)
 
+    def check_semaphore_tracker_death(self, signum, should_die):
+        # bpo-31310: if the semaphore tracker process has died, it should
+        # be restarted implicitly.
+        from multiprocessing.semaphore_tracker import _semaphore_tracker
+        _semaphore_tracker.ensure_running()
+        pid = _semaphore_tracker._pid
+        os.kill(pid, signum)
+        time.sleep(1.0)  # give it time to die
+
+        ctx = multiprocessing.get_context("spawn")
+        with contextlib.ExitStack() as stack:
+            if should_die:
+                stack.enter_context(self.assertWarnsRegex(
+                    UserWarning,
+                    "semaphore_tracker: process died"))
+            sem = ctx.Semaphore()
+            sem.acquire()
+            sem.release()
+            wr = weakref.ref(sem)
+            # ensure `sem` gets collected, which triggers communication with
+            # the semaphore tracker
+            del sem
+            gc.collect()
+            self.assertIsNone(wr())
+
+    def test_semaphore_tracker_sigint(self):
+        # Catchable signal (ignored by semaphore tracker)
+        self.check_semaphore_tracker_death(signal.SIGINT, False)
+
+    def test_semaphore_tracker_sigkill(self):
+        # Uncatchable signal.
+        self.check_semaphore_tracker_death(signal.SIGKILL, True)
+
+
 class TestSimpleQueue(unittest.TestCase):
 
     @classmethod
diff --git a/Misc/NEWS.d/next/Library/2017-08-30-18-23-54.bpo-31310.7D1UNt.rst b/Misc/NEWS.d/next/Library/2017-08-30-18-23-54.bpo-31310.7D1UNt.rst
new file mode 100644
index 00000000000..4d340f07364
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2017-08-30-18-23-54.bpo-31310.7D1UNt.rst
@@ -0,0 +1 @@
+multiprocessing's semaphore tracker should be launched again if crashed.



More information about the Python-checkins mailing list