[Python-checkins] bpo-36668: FIX reuse semaphore tracker for child processes (#5172)

Antoine Pitrou webhook-mailer at python.org
Wed Apr 24 15:47:16 EDT 2019


https://github.com/python/cpython/commit/004b93ea8947bcbe85b6fa16fe0999bfa712d5c1
commit: 004b93ea8947bcbe85b6fa16fe0999bfa712d5c1
branch: master
author: Thomas Moreau <thomas.moreau.2010 at gmail.com>
committer: Antoine Pitrou <antoine at python.org>
date: 2019-04-24T21:45:52+02:00
summary:

bpo-36668: FIX reuse semaphore tracker for child processes (#5172)

Fix the multiprocessing.semaphore_tracker so it is reused by child processes.

files:
A Misc/NEWS.d/next/Library/2018-04-06-11-06-23.bpo-31310.eq9ky0.rst
M Lib/multiprocessing/semaphore_tracker.py
M Lib/test/_test_multiprocessing.py

diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py
index 82833bcf861a..3c2c3ad61aee 100644
--- a/Lib/multiprocessing/semaphore_tracker.py
+++ b/Lib/multiprocessing/semaphore_tracker.py
@@ -44,20 +44,23 @@ def ensure_running(self):
         This can be run from any process.  Usually a child process will use
         the semaphore created by its parent.'''
         with self._lock:
-            if self._pid is not None:
+            if self._fd is not None:
                 # semaphore tracker was launched before, is it still running?
+                if self._check_alive():
+                    # => still alive
+                    return
+                # => dead, launch it again
+                os.close(self._fd)
+
+                # Clean-up to avoid dangling processes.
                 try:
-                    pid, _ = os.waitpid(self._pid, os.WNOHANG)
+                    # _pid can be None if this process is a child from another
+                    # python process, which has started the semaphore_tracker.
+                    if self._pid is not None:
+                        os.waitpid(self._pid, 0)
                 except ChildProcessError:
-                    # The process terminated
+                    # The semaphore_tracker has already been terminated.
                     pass
-                else:
-                    if not pid:
-                        # => still alive
-                        return
-
-                # => dead, launch it again
-                os.close(self._fd)
                 self._fd = None
                 self._pid = None
 
@@ -99,6 +102,17 @@ def ensure_running(self):
             finally:
                 os.close(r)
 
+    def _check_alive(self):
+        '''Check that the pipe has not been closed by sending a probe.'''
+        try:
+            # We cannot use send here as it calls ensure_running, creating
+            # a cycle.
+            os.write(self._fd, b'PROBE:0\n')
+        except OSError:
+            return False
+        else:
+            return True
+
     def register(self, name):
         '''Register name of semaphore with semaphore tracker.'''
         self._send('REGISTER', name)
@@ -150,6 +164,8 @@ def main(fd):
                         cache.add(name)
                     elif cmd == b'UNREGISTER':
                         cache.remove(name)
+                    elif cmd == b'PROBE':
+                        pass
                     else:
                         raise RuntimeError('unrecognized command %r' % cmd)
                 except Exception:
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 553ab8178316..836fde88cd26 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -4891,6 +4891,34 @@ def test_semaphore_tracker_sigkill(self):
         # Uncatchable signal.
         self.check_semaphore_tracker_death(signal.SIGKILL, True)
 
+    @staticmethod
+    def _is_semaphore_tracker_reused(conn, pid):
+        from multiprocessing.semaphore_tracker import _semaphore_tracker
+        _semaphore_tracker.ensure_running()
+        # The pid should be None in the child process, expect for the fork
+        # context. It should not be a new value.
+        reused = _semaphore_tracker._pid in (None, pid)
+        reused &= _semaphore_tracker._check_alive()
+        conn.send(reused)
+
+    def test_semaphore_tracker_reused(self):
+        from multiprocessing.semaphore_tracker import _semaphore_tracker
+        _semaphore_tracker.ensure_running()
+        pid = _semaphore_tracker._pid
+
+        r, w = multiprocessing.Pipe(duplex=False)
+        p = multiprocessing.Process(target=self._is_semaphore_tracker_reused,
+                                    args=(w, pid))
+        p.start()
+        is_semaphore_tracker_reused = r.recv()
+
+        # Clean up
+        p.join()
+        w.close()
+        r.close()
+
+        self.assertTrue(is_semaphore_tracker_reused)
+
 
 class TestSimpleQueue(unittest.TestCase):
 
diff --git a/Misc/NEWS.d/next/Library/2018-04-06-11-06-23.bpo-31310.eq9ky0.rst b/Misc/NEWS.d/next/Library/2018-04-06-11-06-23.bpo-31310.eq9ky0.rst
new file mode 100644
index 000000000000..32ebf4efb74b
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2018-04-06-11-06-23.bpo-31310.eq9ky0.rst
@@ -0,0 +1 @@
+Fix the multiprocessing.semaphore_tracker so it is reused by child processes



More information about the Python-checkins mailing list