[Python-checkins] gh-91607: Fix several test_concurrent_futures tests to actually test what they claim (#91600)

gpshead webhook-mailer at python.org
Sat Apr 16 14:46:47 EDT 2022


https://github.com/python/cpython/commit/7fa3a5a2197896066e3fe53ee325ac6ab54c3414
commit: 7fa3a5a2197896066e3fe53ee325ac6ab54c3414
branch: main
author: Gregory P. Smith <greg at krypto.org>
committer: gpshead <greg at krypto.org>
date: 2022-04-16T11:46:33-07:00
summary:

gh-91607: Fix several test_concurrent_futures tests to actually test what they claim (#91600)

* Fix test_concurrent_futures to actually test what it says.

Many ProcessPoolExecutor based tests were ignoring the mp_context
and using the default instead.  This meant we lacked proper test
coverage of all of them.

Also removes the old _prime_executor() worker delay seeding code
as it appears to have no point and causes 20-30 seconds extra
latency on this already long test.  It also interfered with some
of the refactoring to fix the above to not needlessly create their
own executor when setUp has already created an appropriate one.

* Don't import the name from multiprocessing directly to avoid confusion.

* 📜🤖 Added by blurb_it.

Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>

files:
A Misc/NEWS.d/next/Tests/2022-04-16-17-54-05.gh-issue-91607.FnXjtW.rst
M Lib/test/test_concurrent_futures.py

diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index 8adba36a387ad..978a748df7fa3 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -26,10 +26,10 @@
     PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
     BrokenExecutor)
 from concurrent.futures.process import BrokenProcessPool, _check_system_limits
-from multiprocessing import get_context
 
 import multiprocessing.process
 import multiprocessing.util
+import multiprocessing as mp
 
 
 if support.check_sanitizer(address=True, memory=True):
@@ -130,7 +130,6 @@ def setUp(self):
             self.executor = self.executor_type(
                 max_workers=self.worker_count,
                 **self.executor_kwargs)
-        self._prime_executor()
 
     def tearDown(self):
         self.executor.shutdown(wait=True)
@@ -144,15 +143,7 @@ def tearDown(self):
         super().tearDown()
 
     def get_context(self):
-        return get_context(self.ctx)
-
-    def _prime_executor(self):
-        # Make sure that the executor is ready to do work before running the
-        # tests. This should reduce the probability of timeouts in the tests.
-        futures = [self.executor.submit(time.sleep, 0.1)
-                   for _ in range(self.worker_count)]
-        for f in futures:
-            f.result()
+        return mp.get_context(self.ctx)
 
 
 class ThreadPoolMixin(ExecutorMixin):
@@ -275,9 +266,6 @@ def test_initializer(self):
             with self.assertRaises(BrokenExecutor):
                 self.executor.submit(get_init_status)
 
-    def _prime_executor(self):
-        pass
-
     @contextlib.contextmanager
     def _assert_logged(self, msg):
         if self.log_queue is not None:
@@ -364,14 +352,14 @@ def test_hang_issue12364(self):
             f.result()
 
     def test_cancel_futures(self):
-        executor = self.executor_type(max_workers=3)
-        fs = [executor.submit(time.sleep, .1) for _ in range(50)]
-        executor.shutdown(cancel_futures=True)
+        assert self.worker_count <= 5, "test needs few workers"
+        fs = [self.executor.submit(time.sleep, .1) for _ in range(50)]
+        self.executor.shutdown(cancel_futures=True)
         # We can't guarantee the exact number of cancellations, but we can
-        # guarantee that *some* were cancelled. With setting max_workers to 3,
-        # most of the submitted futures should have been cancelled.
+        # guarantee that *some* were cancelled. With few workers, many of
+        # the submitted futures should have been cancelled.
         cancelled = [fut for fut in fs if fut.cancelled()]
-        self.assertTrue(len(cancelled) >= 35, msg=f"{len(cancelled)=}")
+        self.assertGreater(len(cancelled), 20)
 
         # Ensure the other futures were able to finish.
         # Use "not fut.cancelled()" instead of "fut.done()" to include futures
@@ -384,33 +372,32 @@ def test_cancel_futures(self):
         # Similar to the number of cancelled futures, we can't guarantee the
         # exact number that completed. But, we can guarantee that at least
         # one finished.
-        self.assertTrue(len(others) > 0, msg=f"{len(others)=}")
+        self.assertGreater(len(others), 0)
 
-    def test_hang_issue39205(self):
+    def test_hang_gh83386(self):
         """shutdown(wait=False) doesn't hang at exit with running futures.
 
-        See https://bugs.python.org/issue39205.
+        See https://github.com/python/cpython/issues/83386.
         """
         if self.executor_type == futures.ProcessPoolExecutor:
             raise unittest.SkipTest(
-                "Hangs due to https://bugs.python.org/issue39205")
+                "Hangs, see https://github.com/python/cpython/issues/83386")
 
         rc, out, err = assert_python_ok('-c', """if True:
             from concurrent.futures import {executor_type}
             from test.test_concurrent_futures import sleep_and_print
             if __name__ == "__main__":
+                if {context!r}: multiprocessing.set_start_method({context!r})
                 t = {executor_type}(max_workers=3)
                 t.submit(sleep_and_print, 1.0, "apple")
                 t.shutdown(wait=False)
-            """.format(executor_type=self.executor_type.__name__))
+            """.format(executor_type=self.executor_type.__name__,
+                       context=getattr(self, 'ctx', None)))
         self.assertFalse(err)
         self.assertEqual(out.strip(), b"apple")
 
 
 class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
-    def _prime_executor(self):
-        pass
-
     def test_threads_terminate(self):
         def acquire_lock(lock):
             lock.acquire()
@@ -505,14 +492,11 @@ def test_cancel_futures_wait_false(self):
 
 
 class ProcessPoolShutdownTest(ExecutorShutdownTest):
-    def _prime_executor(self):
-        pass
-
     def test_processes_terminate(self):
         def acquire_lock(lock):
             lock.acquire()
 
-        mp_context = get_context()
+        mp_context = self.get_context()
         sem = mp_context.Semaphore(0)
         for _ in range(3):
             self.executor.submit(acquire_lock, sem)
@@ -526,7 +510,8 @@ def acquire_lock(lock):
             p.join()
 
     def test_context_manager_shutdown(self):
-        with futures.ProcessPoolExecutor(max_workers=5) as e:
+        with futures.ProcessPoolExecutor(
+                max_workers=5, mp_context=self.get_context()) as e:
             processes = e._processes
             self.assertEqual(list(e.map(abs, range(-5, 5))),
                              [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
@@ -535,7 +520,8 @@ def test_context_manager_shutdown(self):
             p.join()
 
     def test_del_shutdown(self):
-        executor = futures.ProcessPoolExecutor(max_workers=5)
+        executor = futures.ProcessPoolExecutor(
+                max_workers=5, mp_context=self.get_context())
         res = executor.map(abs, range(-5, 5))
         executor_manager_thread = executor._executor_manager_thread
         processes = executor._processes
@@ -558,7 +544,8 @@ def test_del_shutdown(self):
     def test_shutdown_no_wait(self):
         # Ensure that the executor cleans up the processes when calling
         # shutdown with wait=False
-        executor = futures.ProcessPoolExecutor(max_workers=5)
+        executor = futures.ProcessPoolExecutor(
+                max_workers=5, mp_context=self.get_context())
         res = executor.map(abs, range(-5, 5))
         processes = executor._processes
         call_queue = executor._call_queue
@@ -935,7 +922,7 @@ def submit(pool):
             pool.submit(submit, pool)
 
             for _ in range(50):
-                with futures.ProcessPoolExecutor(1, mp_context=get_context('fork')) as workers:
+                with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
                     workers.submit(tuple)
 
 
@@ -1005,7 +992,7 @@ def test_traceback(self):
     def test_ressources_gced_in_workers(self):
         # Ensure that argument for a job are correctly gc-ed after the job
         # is finished
-        mgr = get_context(self.ctx).Manager()
+        mgr = self.get_context().Manager()
         obj = EventfulGCObj(mgr)
         future = self.executor.submit(id, obj)
         future.result()
@@ -1021,38 +1008,41 @@ def test_ressources_gced_in_workers(self):
         mgr.join()
 
     def test_saturation(self):
-        executor = self.executor_type(4)
-        mp_context = get_context()
+        executor = self.executor
+        mp_context = self.get_context()
         sem = mp_context.Semaphore(0)
         job_count = 15 * executor._max_workers
-        try:
-            for _ in range(job_count):
-                executor.submit(sem.acquire)
-            self.assertEqual(len(executor._processes), executor._max_workers)
-            for _ in range(job_count):
-                sem.release()
-        finally:
-            executor.shutdown()
+        for _ in range(job_count):
+            executor.submit(sem.acquire)
+        self.assertEqual(len(executor._processes), executor._max_workers)
+        for _ in range(job_count):
+            sem.release()
 
     def test_idle_process_reuse_one(self):
-        executor = self.executor_type(4)
+        executor = self.executor
+        assert executor._max_workers >= 4
         executor.submit(mul, 21, 2).result()
         executor.submit(mul, 6, 7).result()
         executor.submit(mul, 3, 14).result()
         self.assertEqual(len(executor._processes), 1)
-        executor.shutdown()
 
     def test_idle_process_reuse_multiple(self):
-        executor = self.executor_type(4)
+        executor = self.executor
+        assert executor._max_workers <= 5
         executor.submit(mul, 12, 7).result()
         executor.submit(mul, 33, 25)
         executor.submit(mul, 25, 26).result()
         executor.submit(mul, 18, 29)
-        self.assertLessEqual(len(executor._processes), 2)
+        executor.submit(mul, 1, 2).result()
+        executor.submit(mul, 0, 9)
+        self.assertLessEqual(len(executor._processes), 3)
         executor.shutdown()
 
     def test_max_tasks_per_child(self):
-        executor = self.executor_type(1, max_tasks_per_child=3)
+        # not using self.executor as we need to control construction.
+        # arguably this could go in another class w/o that mixin.
+        executor = self.executor_type(
+                1, mp_context=self.get_context(), max_tasks_per_child=3)
         f1 = executor.submit(os.getpid)
         original_pid = f1.result()
         # The worker pid remains the same as the worker could be reused
@@ -1072,7 +1062,10 @@ def test_max_tasks_per_child(self):
         executor.shutdown()
 
     def test_max_tasks_early_shutdown(self):
-        executor = self.executor_type(3, max_tasks_per_child=1)
+        # not using self.executor as we need to control construction.
+        # arguably this could go in another class w/o that mixin.
+        executor = self.executor_type(
+                3, mp_context=self.get_context(), max_tasks_per_child=1)
         futures = []
         for i in range(6):
             futures.append(executor.submit(mul, i, i))
@@ -1182,7 +1175,7 @@ def _check_crash(self, error, func, *args, ignore_stderr=False):
         self.executor.shutdown(wait=True)
 
         executor = self.executor_type(
-            max_workers=2, mp_context=get_context(self.ctx))
+            max_workers=2, mp_context=self.get_context())
         res = executor.submit(func, *args)
 
         if ignore_stderr:
@@ -1261,7 +1254,7 @@ def test_shutdown_deadlock(self):
         # if a worker fails after the shutdown call.
         self.executor.shutdown(wait=True)
         with self.executor_type(max_workers=2,
-                                mp_context=get_context(self.ctx)) as executor:
+                                mp_context=self.get_context()) as executor:
             self.executor = executor  # Allow clean up in fail_on_deadlock
             f = executor.submit(_crash, delay=.1)
             executor.shutdown(wait=True)
@@ -1274,7 +1267,7 @@ def test_shutdown_deadlock_pickle(self):
         # Reported in bpo-39104.
         self.executor.shutdown(wait=True)
         with self.executor_type(max_workers=2,
-                                mp_context=get_context(self.ctx)) as executor:
+                                mp_context=self.get_context()) as executor:
             self.executor = executor  # Allow clean up in fail_on_deadlock
 
             # Start the executor and get the executor_manager_thread to collect
diff --git a/Misc/NEWS.d/next/Tests/2022-04-16-17-54-05.gh-issue-91607.FnXjtW.rst b/Misc/NEWS.d/next/Tests/2022-04-16-17-54-05.gh-issue-91607.FnXjtW.rst
new file mode 100644
index 0000000000000..32839a826a41e
--- /dev/null
+++ b/Misc/NEWS.d/next/Tests/2022-04-16-17-54-05.gh-issue-91607.FnXjtW.rst
@@ -0,0 +1 @@
+Fix ``test_concurrent_futures`` to test the correct multiprocessing start method context in several cases where the test logic mixed this up.



More information about the Python-checkins mailing list