[Python-checkins] bpo-36719: Fix regrtest MultiprocessThread (GH-13301) (GH-13303)

Victor Stinner webhook-mailer at python.org
Tue May 14 08:12:52 EDT 2019


https://github.com/python/cpython/commit/d8e123a48f1666227abdb90d84c58efe7bb4f3d8
commit: d8e123a48f1666227abdb90d84c58efe7bb4f3d8
branch: 3.7
author: Miss Islington (bot) <31488909+miss-islington at users.noreply.github.com>
committer: Victor Stinner <vstinner at redhat.com>
date: 2019-05-14T14:12:49+02:00
summary:

bpo-36719: Fix regrtest MultiprocessThread (GH-13301) (GH-13303)

MultiprocessThread.kill() now closes stdout and stderr to prevent
popen.communicate() to hang.
(cherry picked from commit c923c3449f825021b13521b2380e67ba35a36f55)

Co-authored-by: Victor Stinner <vstinner at redhat.com>

files:
M Lib/test/libregrtest/runtest_mp.py

diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py
index ced7f866a899..42178471ef1d 100644
--- a/Lib/test/libregrtest/runtest_mp.py
+++ b/Lib/test/libregrtest/runtest_mp.py
@@ -21,6 +21,9 @@
 # Display the running tests if nothing happened last N seconds
 PROGRESS_UPDATE = 30.0   # seconds
 
+# Time to wait until a worker completes: should be immediate
+JOIN_TIMEOUT = 30.0   # seconds
+
 
 def must_stop(result, ns):
     if result.result == INTERRUPTED:
@@ -91,6 +94,10 @@ def stop(self):
 MultiprocessResult = collections.namedtuple('MultiprocessResult',
     'result stdout stderr error_msg')
 
+class ExitThread(Exception):
+    pass
+
+
 class MultiprocessThread(threading.Thread):
     def __init__(self, pending, output, ns):
         super().__init__()
@@ -100,13 +107,31 @@ def __init__(self, pending, output, ns):
         self.current_test_name = None
         self.start_time = None
         self._popen = None
+        self._killed = False
+
+    def __repr__(self):
+        info = ['MultiprocessThread']
+        test = self.current_test_name
+        if self.is_alive():
+            info.append('alive')
+        if test:
+            info.append(f'test={test}')
+        popen = self._popen
+        if popen:
+            info.append(f'pid={popen.pid}')
+        return '<%s>' % ' '.join(info)
 
     def kill(self):
+        self._killed = True
+
         popen = self._popen
         if popen is None:
             return
-        print("Kill regrtest worker process %s" % popen.pid)
         popen.kill()
+        # stdout and stderr must be closed to ensure that communicate()
+        # does not hang
+        popen.stdout.close()
+        popen.stderr.close()
 
     def _runtest(self, test_name):
         try:
@@ -117,7 +142,21 @@ def _runtest(self, test_name):
             popen = self._popen
             with popen:
                 try:
-                    stdout, stderr = popen.communicate()
+                    if self._killed:
+                        # If kill() has been called before self._popen is set,
+                        # self._popen is still running. Call again kill()
+                        # to ensure that the process is killed.
+                        self.kill()
+                        raise ExitThread
+
+                    try:
+                        stdout, stderr = popen.communicate()
+                    except OSError:
+                        if self._killed:
+                            # kill() has been called: communicate() fails
+                            # on reading closed stdout/stderr
+                            raise ExitThread
+                        raise
                 except:
                     self.kill()
                     popen.wait()
@@ -154,7 +193,7 @@ def _runtest(self, test_name):
         return MultiprocessResult(result, stdout, stderr, err_msg)
 
     def run(self):
-        while True:
+        while not self._killed:
             try:
                 try:
                     test_name = next(self.pending)
@@ -166,6 +205,8 @@ def run(self):
 
                 if must_stop(mp_result.result, self.ns):
                     break
+            except ExitThread:
+                break
             except BaseException:
                 self.output.put((True, traceback.format_exc()))
                 break
@@ -205,10 +246,20 @@ def start_workers(self):
             worker.start()
 
     def wait_workers(self):
+        start_time = time.monotonic()
         for worker in self.workers:
             worker.kill()
         for worker in self.workers:
-            worker.join()
+            while True:
+                worker.join(1.0)
+                if not worker.is_alive():
+                    break
+                dt = time.monotonic() - start_time
+                print("Wait for regrtest worker %r for %.1f sec" % (worker, dt))
+                if dt > JOIN_TIMEOUT:
+                    print("Warning -- failed to join a regrtest worker %s"
+                          % worker)
+                    break
 
     def _get_result(self):
         if not any(worker.is_alive() for worker in self.workers):



More information about the Python-checkins mailing list