[Python-checkins] bpo-39678: refactor queue manager thread (GH-18551)

Thomas Moreau webhook-mailer at python.org
Sun Mar 1 15:49:19 EST 2020


https://github.com/python/cpython/commit/0e89076247580ba0e570c4816f0e5628a7e36e83
commit: 0e89076247580ba0e570c4816f0e5628a7e36e83
branch: master
author: Thomas Moreau <thomas.moreau.2010 at gmail.com>
committer: GitHub <noreply at github.com>
date: 2020-03-01T21:49:14+01:00
summary:

bpo-39678: refactor queue manager thread (GH-18551)

files:
A Misc/NEWS.d/next/Library/2020-02-28-12-59-30.bpo-39678.3idfxM.rst
M Lib/concurrent/futures/process.py
M Lib/test/test_concurrent_futures.py

diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index d77322831a6c6..39fadcce027c2 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -49,7 +49,6 @@
 import os
 from concurrent.futures import _base
 import queue
-from queue import Full
 import multiprocessing as mp
 import multiprocessing.connection
 from multiprocessing.queues import Queue
@@ -176,8 +175,9 @@ def _on_queue_feeder_error(self, e, obj):
             e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
             work_item = self.pending_work_items.pop(obj.work_id, None)
             self.thread_wakeup.wakeup()
-            # work_item can be None if another process terminated. In this case,
-            # the queue_manager_thread fails all work_items with BrokenProcessPool
+            # work_item can be None if another process terminated. In this
+            # case, the executor_manager_thread fails all work_items
+            # with BrokenProcessPool
             if work_item is not None:
                 work_item.future.set_exception(e)
         else:
@@ -193,6 +193,7 @@ def _get_chunks(*iterables, chunksize):
             return
         yield chunk
 
+
 def _process_chunk(fn, chunk):
     """ Processes a chunk of an iterable passed to map.
 
@@ -256,122 +257,123 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
         del call_item
 
 
-def _add_call_item_to_queue(pending_work_items,
-                            work_ids,
-                            call_queue):
-    """Fills call_queue with _WorkItems from pending_work_items.
+class _ExecutorManagerThread(threading.Thread):
+    """Manages the communication between this process and the worker processes.
 
-    This function never blocks.
+    The manager is run in a local thread.
 
     Args:
-        pending_work_items: A dict mapping work ids to _WorkItems e.g.
-            {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
-        work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
-            are consumed and the corresponding _WorkItems from
-            pending_work_items are transformed into _CallItems and put in
-            call_queue.
-        call_queue: A multiprocessing.Queue that will be filled with _CallItems
-            derived from _WorkItems.
+        executor: A reference to the ProcessPoolExecutor that owns
+            this thread. A weakref will be own by the manager as well as
+            references to internal objects used to introspect the state of
+            the executor.
     """
-    while True:
-        if call_queue.full():
-            return
-        try:
-            work_id = work_ids.get(block=False)
-        except queue.Empty:
-            return
-        else:
-            work_item = pending_work_items[work_id]
-
-            if work_item.future.set_running_or_notify_cancel():
-                call_queue.put(_CallItem(work_id,
-                                         work_item.fn,
-                                         work_item.args,
-                                         work_item.kwargs),
-                               block=True)
-            else:
-                del pending_work_items[work_id]
-                continue
 
+    def __init__(self, executor):
+        # Store references to necessary internals of the executor.
 
-def _queue_management_worker(executor_reference,
-                             processes,
-                             pending_work_items,
-                             work_ids_queue,
-                             call_queue,
-                             result_queue,
-                             thread_wakeup):
-    """Manages the communication between this process and the worker processes.
+        # A _ThreadWakeup to allow waking up the queue_manager_thread from the
+        # main Thread and avoid deadlocks caused by permanently locked queues.
+        self.thread_wakeup = executor._executor_manager_thread_wakeup
 
-    This function is run in a local thread.
+        # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
+        # to determine if the ProcessPoolExecutor has been garbage collected
+        # and that the manager can exit.
+        # When the executor gets garbage collected, the weakref callback
+        # will wake up the queue management thread so that it can terminate
+        # if there is no pending work item.
+        def weakref_cb(_, thread_wakeup=self.thread_wakeup):
+            mp.util.debug('Executor collected: triggering callback for'
+                          ' QueueManager wakeup')
+            thread_wakeup.wakeup()
 
-    Args:
-        executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
-            this thread. Used to determine if the ProcessPoolExecutor has been
-            garbage collected and that this function can exit.
-        process: A list of the ctx.Process instances used as
-            workers.
-        pending_work_items: A dict mapping work ids to _WorkItems e.g.
-            {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
-        work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
-        call_queue: A ctx.Queue that will be filled with _CallItems
-            derived from _WorkItems for processing by the process workers.
-        result_queue: A ctx.SimpleQueue of _ResultItems generated by the
-            process workers.
-        thread_wakeup: A _ThreadWakeup to allow waking up the
-            queue_manager_thread from the main Thread and avoid deadlocks
-            caused by permanently locked queues.
-    """
-    executor = None
+        self.executor_reference = weakref.ref(executor, weakref_cb)
 
-    def shutting_down():
-        return (_global_shutdown or executor is None
-                or executor._shutdown_thread)
+        # A list of the ctx.Process instances used as workers.
+        self.processes = executor._processes
 
-    def shutdown_worker():
-        # This is an upper bound on the number of children alive.
-        n_children_alive = sum(p.is_alive() for p in processes.values())
-        n_children_to_stop = n_children_alive
-        n_sentinels_sent = 0
-        # Send the right number of sentinels, to make sure all children are
-        # properly terminated.
-        while n_sentinels_sent < n_children_to_stop and n_children_alive > 0:
-            for i in range(n_children_to_stop - n_sentinels_sent):
-                try:
-                    call_queue.put_nowait(None)
-                    n_sentinels_sent += 1
-                except Full:
-                    break
-            n_children_alive = sum(p.is_alive() for p in processes.values())
+        # A ctx.Queue that will be filled with _CallItems derived from
+        # _WorkItems for processing by the process workers.
+        self.call_queue = executor._call_queue
 
-        # Release the queue's resources as soon as possible.
-        call_queue.close()
-        call_queue.join_thread()
-        thread_wakeup.close()
-        # If .join() is not called on the created processes then
-        # some ctx.Queue methods may deadlock on Mac OS X.
-        for p in processes.values():
-            p.join()
+        # A ctx.SimpleQueue of _ResultItems generated by the process workers.
+        self.result_queue = executor._result_queue
 
-    result_reader = result_queue._reader
-    wakeup_reader = thread_wakeup._reader
-    readers = [result_reader, wakeup_reader]
+        # A queue.Queue of work ids e.g. Queue([5, 6, ...]).
+        self.work_ids_queue = executor._work_ids
 
-    while True:
-        _add_call_item_to_queue(pending_work_items,
-                                work_ids_queue,
-                                call_queue)
+        # A dict mapping work ids to _WorkItems e.g.
+        #     {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
+        self.pending_work_items = executor._pending_work_items
+
+        # Set this thread to be daemonized
+        super().__init__()
+        self.daemon = True
 
+    def run(self):
+        # Main loop for the executor manager thread.
+
+        while True:
+            self.add_call_item_to_queue()
+
+            result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
+
+            if is_broken:
+                self.terminate_broken(cause)
+                return
+            if result_item is not None:
+                self.process_result_item(result_item)
+                # Delete reference to result_item to avoid keeping references
+                # while waiting on new results.
+                del result_item
+
+            if self.is_shutting_down():
+                self.flag_executor_shutting_down()
+
+                # Since no new work items can be added, it is safe to shutdown
+                # this thread if there are no pending work items.
+                if not self.pending_work_items:
+                    self.join_executor_internals()
+                    return
+
+    def add_call_item_to_queue(self):
+        # Fills call_queue with _WorkItems from pending_work_items.
+        # This function never blocks.
+        while True:
+            if self.call_queue.full():
+                return
+            try:
+                work_id = self.work_ids_queue.get(block=False)
+            except queue.Empty:
+                return
+            else:
+                work_item = self.pending_work_items[work_id]
+
+                if work_item.future.set_running_or_notify_cancel():
+                    self.call_queue.put(_CallItem(work_id,
+                                                  work_item.fn,
+                                                  work_item.args,
+                                                  work_item.kwargs),
+                                        block=True)
+                else:
+                    del self.pending_work_items[work_id]
+                    continue
+
+    def wait_result_broken_or_wakeup(self):
         # Wait for a result to be ready in the result_queue while checking
         # that all worker processes are still running, or for a wake up
         # signal send. The wake up signals come either from new tasks being
         # submitted, from the executor being shutdown/gc-ed, or from the
         # shutdown of the python interpreter.
-        worker_sentinels = [p.sentinel for p in processes.values()]
+        result_reader = self.result_queue._reader
+        wakeup_reader = self.thread_wakeup._reader
+        readers = [result_reader, wakeup_reader]
+        worker_sentinels = [p.sentinel for p in self.processes.values()]
         ready = mp.connection.wait(readers + worker_sentinels)
 
         cause = None
         is_broken = True
+        result_item = None
         if result_reader in ready:
             try:
                 result_item = result_reader.recv()
@@ -381,97 +383,135 @@ def shutdown_worker():
 
         elif wakeup_reader in ready:
             is_broken = False
-            result_item = None
-        thread_wakeup.clear()
-        if is_broken:
-            # Mark the process pool broken so that submits fail right now.
-            executor = executor_reference()
-            if executor is not None:
-                executor._broken = ('A child process terminated '
-                                    'abruptly, the process pool is not '
-                                    'usable anymore')
-                executor._shutdown_thread = True
-                executor = None
-            bpe = BrokenProcessPool("A process in the process pool was "
-                                    "terminated abruptly while the future was "
-                                    "running or pending.")
-            if cause is not None:
-                bpe.__cause__ = _RemoteTraceback(
-                    f"\n'''\n{''.join(cause)}'''")
-            # All futures in flight must be marked failed
-            for work_id, work_item in pending_work_items.items():
-                work_item.future.set_exception(bpe)
-                # Delete references to object. See issue16284
-                del work_item
-            pending_work_items.clear()
-            # Terminate remaining workers forcibly: the queues or their
-            # locks may be in a dirty state and block forever.
-            for p in processes.values():
-                p.terminate()
-            shutdown_worker()
-            return
+        self.thread_wakeup.clear()
+
+        return result_item, is_broken, cause
+
+    def process_result_item(self, result_item):
+        # Process the received a result_item. This can be either the PID of a
+        # worker that exited gracefully or a _ResultItem
+
         if isinstance(result_item, int):
             # Clean shutdown of a worker using its PID
             # (avoids marking the executor broken)
-            assert shutting_down()
-            p = processes.pop(result_item)
+            assert self.is_shutting_down()
+            p = self.processes.pop(result_item)
             p.join()
-            if not processes:
-                shutdown_worker()
+            if not self.processes:
+                self.join_executor_internals()
                 return
-        elif result_item is not None:
-            work_item = pending_work_items.pop(result_item.work_id, None)
+        else:
+            # Received a _ResultItem so mark the future as completed.
+            work_item = self.pending_work_items.pop(result_item.work_id, None)
             # work_item can be None if another process terminated (see above)
             if work_item is not None:
                 if result_item.exception:
                     work_item.future.set_exception(result_item.exception)
                 else:
                     work_item.future.set_result(result_item.result)
-                # Delete references to object. See issue16284
-                del work_item
-            # Delete reference to result_item
-            del result_item
 
-        # Check whether we should start shutting down.
-        executor = executor_reference()
+    def is_shutting_down(self):
+        # Check whether we should start shutting down the executor.
+        executor = self.executor_reference()
         # No more work items can be added if:
         #   - The interpreter is shutting down OR
         #   - The executor that owns this worker has been collected OR
         #   - The executor that owns this worker has been shutdown.
-        if shutting_down():
-            try:
-                # Flag the executor as shutting down as early as possible if it
-                # is not gc-ed yet.
-                if executor is not None:
-                    executor._shutdown_thread = True
-                    # Unless there are pending work items, we have nothing to cancel.
-                    if pending_work_items and executor._cancel_pending_futures:
-                        # Cancel all pending futures and update pending_work_items
-                        # to only have futures that are currently running.
-                        new_pending_work_items = {}
-                        for work_id, work_item in pending_work_items.items():
-                            if not work_item.future.cancel():
-                                new_pending_work_items[work_id] = work_item
-
-                        pending_work_items = new_pending_work_items
-                        # Drain work_ids_queue since we no longer need to
-                        # add items to the call queue.
-                        while True:
-                            try:
-                                work_ids_queue.get_nowait()
-                            except queue.Empty:
-                                break
+        return (_global_shutdown or executor is None
+                or executor._shutdown_thread)
 
-                # Since no new work items can be added, it is safe to shutdown
-                # this thread if there are no pending work items.
-                if not pending_work_items:
-                    shutdown_worker()
-                    return
-            except Full:
-                # This is not a problem: we will eventually be woken up (in
-                # result_queue.get()) and be able to send a sentinel again.
-                pass
-        executor = None
+    def terminate_broken(self, cause):
+        # Terminate the executor because it is in a broken state. The cause
+        # argument can be used to display more information on the error that
+        # lead the executor into becoming broken.
+
+        # Mark the process pool broken so that submits fail right now.
+        executor = self.executor_reference()
+        if executor is not None:
+            executor._broken = ('A child process terminated '
+                                'abruptly, the process pool is not '
+                                'usable anymore')
+            executor._shutdown_thread = True
+            executor = None
+
+        # All pending tasks are to be marked failed with the following
+        # BrokenProcessPool error
+        bpe = BrokenProcessPool("A process in the process pool was "
+                                "terminated abruptly while the future was "
+                                "running or pending.")
+        if cause is not None:
+            bpe.__cause__ = _RemoteTraceback(
+                f"\n'''\n{''.join(cause)}'''")
+
+        # Mark pending tasks as failed.
+        for work_id, work_item in self.pending_work_items.items():
+            work_item.future.set_exception(bpe)
+            # Delete references to object. See issue16284
+            del work_item
+        self.pending_work_items.clear()
+
+        # Terminate remaining workers forcibly: the queues or their
+        # locks may be in a dirty state and block forever.
+        for p in self.processes.values():
+            p.terminate()
+
+        # clean up resources
+        self.join_executor_internals()
+
+    def flag_executor_shutting_down(self):
+        # Flag the executor as shutting down and cancel remaining tasks if
+        # requested as early as possible if it is not gc-ed yet.
+        executor = self.executor_reference()
+        if executor is not None:
+            executor._shutdown_thread = True
+            # Cancel pending work items if requested.
+            if executor._cancel_pending_futures:
+                # Cancel all pending futures and update pending_work_items
+                # to only have futures that are currently running.
+                new_pending_work_items = {}
+                for work_id, work_item in self.pending_work_items.items():
+                    if not work_item.future.cancel():
+                        new_pending_work_items[work_id] = work_item
+                self.pending_work_items = new_pending_work_items
+                # Drain work_ids_queue since we no longer need to
+                # add items to the call queue.
+                while True:
+                    try:
+                        self.work_ids_queue.get_nowait()
+                    except queue.Empty:
+                        break
+                # Make sure we do this only once to not waste time looping
+                # on running processes over and over.
+                executor._cancel_pending_futures = False
+
+    def shutdown_workers(self):
+        n_children_to_stop = self.get_n_children_alive()
+        n_sentinels_sent = 0
+        # Send the right number of sentinels, to make sure all children are
+        # properly terminated.
+        while (n_sentinels_sent < n_children_to_stop
+                and self.get_n_children_alive() > 0):
+            for i in range(n_children_to_stop - n_sentinels_sent):
+                try:
+                    self.call_queue.put_nowait(None)
+                    n_sentinels_sent += 1
+                except queue.Full:
+                    break
+
+    def join_executor_internals(self):
+        self.shutdown_workers()
+        # Release the queue's resources as soon as possible.
+        self.call_queue.close()
+        self.call_queue.join_thread()
+        self.thread_wakeup.close()
+        # If .join() is not called on the created processes then
+        # some ctx.Queue methods may deadlock on Mac OS X.
+        for p in self.processes.values():
+            p.join()
+
+    def get_n_children_alive(self):
+        # This is an upper bound on the number of children alive.
+        return sum(p.is_alive() for p in self.processes.values())
 
 
 _system_limits_checked = False
@@ -562,7 +602,7 @@ def __init__(self, max_workers=None, mp_context=None,
         self._initargs = initargs
 
         # Management thread
-        self._queue_management_thread = None
+        self._executor_manager_thread = None
 
         # Map of pids to processes
         self._processes = {}
@@ -576,12 +616,12 @@ def __init__(self, max_workers=None, mp_context=None,
         self._cancel_pending_futures = False
 
         # _ThreadWakeup is a communication channel used to interrupt the wait
-        # of the main loop of queue_manager_thread from another thread (e.g.
+        # of the main loop of executor_manager_thread from another thread (e.g.
         # when calling executor.submit or executor.shutdown). We do not use the
-        # _result_queue to send the wakeup signal to the queue_manager_thread
+        # _result_queue to send wakeup signals to the executor_manager_thread
         # as it could result in a deadlock if a worker process dies with the
         # _result_queue write lock still acquired.
-        self._queue_management_thread_wakeup = _ThreadWakeup()
+        self._executor_manager_thread_wakeup = _ThreadWakeup()
 
         # Create communication channels for the executor
         # Make the call queue slightly larger than the number of processes to
@@ -591,7 +631,7 @@ def __init__(self, max_workers=None, mp_context=None,
         self._call_queue = _SafeQueue(
             max_size=queue_size, ctx=self._mp_context,
             pending_work_items=self._pending_work_items,
-            thread_wakeup=self._queue_management_thread_wakeup)
+            thread_wakeup=self._executor_manager_thread_wakeup)
         # Killed worker processes can produce spurious "broken pipe"
         # tracebacks in the queue's own worker thread. But we detect killed
         # processes anyway, so silence the tracebacks.
@@ -599,32 +639,14 @@ def __init__(self, max_workers=None, mp_context=None,
         self._result_queue = mp_context.SimpleQueue()
         self._work_ids = queue.Queue()
 
-    def _start_queue_management_thread(self):
-        if self._queue_management_thread is None:
-            # When the executor gets garbarge collected, the weakref callback
-            # will wake up the queue management thread so that it can terminate
-            # if there is no pending work item.
-            def weakref_cb(_,
-                           thread_wakeup=self._queue_management_thread_wakeup):
-                mp.util.debug('Executor collected: triggering callback for'
-                              ' QueueManager wakeup')
-                thread_wakeup.wakeup()
+    def _start_executor_manager_thread(self):
+        if self._executor_manager_thread is None:
             # Start the processes so that their sentinels are known.
             self._adjust_process_count()
-            self._queue_management_thread = threading.Thread(
-                target=_queue_management_worker,
-                args=(weakref.ref(self, weakref_cb),
-                      self._processes,
-                      self._pending_work_items,
-                      self._work_ids,
-                      self._call_queue,
-                      self._result_queue,
-                      self._queue_management_thread_wakeup),
-                name="QueueManagerThread")
-            self._queue_management_thread.daemon = True
-            self._queue_management_thread.start()
-            _threads_wakeups[self._queue_management_thread] = \
-                self._queue_management_thread_wakeup
+            self._executor_manager_thread = _ExecutorManagerThread(self)
+            self._executor_manager_thread.start()
+            _threads_wakeups[self._executor_manager_thread] = \
+                self._executor_manager_thread_wakeup
 
     def _adjust_process_count(self):
         for _ in range(len(self._processes), self._max_workers):
@@ -654,9 +676,9 @@ def submit(self, fn, /, *args, **kwargs):
             self._work_ids.put(self._queue_count)
             self._queue_count += 1
             # Wake up queue management thread
-            self._queue_management_thread_wakeup.wakeup()
+            self._executor_manager_thread_wakeup.wakeup()
 
-            self._start_queue_management_thread()
+            self._start_executor_manager_thread()
             return f
     submit.__doc__ = _base.Executor.submit.__doc__
 
@@ -694,20 +716,20 @@ def shutdown(self, wait=True, *, cancel_futures=False):
             self._cancel_pending_futures = cancel_futures
             self._shutdown_thread = True
 
-        if self._queue_management_thread:
+        if self._executor_manager_thread:
             # Wake up queue management thread
-            self._queue_management_thread_wakeup.wakeup()
+            self._executor_manager_thread_wakeup.wakeup()
             if wait:
-                self._queue_management_thread.join()
+                self._executor_manager_thread.join()
         # To reduce the risk of opening too many files, remove references to
         # objects that use file descriptors.
-        self._queue_management_thread = None
+        self._executor_manager_thread = None
         self._call_queue = None
         self._result_queue = None
         self._processes = None
 
-        if self._queue_management_thread_wakeup:
-            self._queue_management_thread_wakeup = None
+        if self._executor_manager_thread_wakeup:
+            self._executor_manager_thread_wakeup = None
 
     shutdown.__doc__ = _base.Executor.shutdown.__doc__
 
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index a7381f9d13eb1..868415ab29916 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -508,15 +508,15 @@ def test_context_manager_shutdown(self):
     def test_del_shutdown(self):
         executor = futures.ProcessPoolExecutor(max_workers=5)
         res = executor.map(abs, range(-5, 5))
-        queue_management_thread = executor._queue_management_thread
+        executor_manager_thread = executor._executor_manager_thread
         processes = executor._processes
         call_queue = executor._call_queue
-        queue_management_thread = executor._queue_management_thread
+        executor_manager_thread = executor._executor_manager_thread
         del executor
 
         # Make sure that all the executor resources were properly cleaned by
         # the shutdown process
-        queue_management_thread.join()
+        executor_manager_thread.join()
         for p in processes.values():
             p.join()
         call_queue.join_thread()
@@ -532,12 +532,12 @@ def test_shutdown_no_wait(self):
         res = executor.map(abs, range(-5, 5))
         processes = executor._processes
         call_queue = executor._call_queue
-        queue_management_thread = executor._queue_management_thread
+        executor_manager_thread = executor._executor_manager_thread
         executor.shutdown(wait=False)
 
         # Make sure that all the executor resources were properly cleaned by
         # the shutdown process
-        queue_management_thread.join()
+        executor_manager_thread.join()
         for p in processes.values():
             p.join()
         call_queue.join_thread()
@@ -1139,11 +1139,11 @@ def test_shutdown_deadlock_pickle(self):
                                 mp_context=get_context(self.ctx)) as executor:
             self.executor = executor  # Allow clean up in fail_on_deadlock
 
-            # Start the executor and get the queue_management_thread to collect
+            # Start the executor and get the executor_manager_thread to collect
             # the threads and avoid dangling thread that should be cleaned up
             # asynchronously.
             executor.submit(id, 42).result()
-            queue_manager = executor._queue_management_thread
+            executor_manager = executor._executor_manager_thread
 
             # Submit a task that fails at pickle and shutdown the executor
             # without waiting
@@ -1154,7 +1154,7 @@ def test_shutdown_deadlock_pickle(self):
 
         # Make sure the executor is eventually shutdown and do not leave
         # dangling threads
-        queue_manager.join()
+        executor_manager.join()
 
 
 create_executor_tests(ExecutorDeadlockTest,
diff --git a/Misc/NEWS.d/next/Library/2020-02-28-12-59-30.bpo-39678.3idfxM.rst b/Misc/NEWS.d/next/Library/2020-02-28-12-59-30.bpo-39678.3idfxM.rst
new file mode 100644
index 0000000000000..8b18e2259c5c7
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2020-02-28-12-59-30.bpo-39678.3idfxM.rst
@@ -0,0 +1,2 @@
+Refactor queue_manager in :class:`concurrent.futures.ProcessPoolExecutor` to
+make it easier to maintain.
\ No newline at end of file



More information about the Python-checkins mailing list