[Python-checkins] [3.6] Revert "bpo-34172: multiprocessing.Pool leaks resources after being deleted (GH-8450) (GH-9677)" (GH-10969)
Victor Stinner
webhook-mailer at python.org
Wed Dec 5 19:49:38 EST 2018
https://github.com/python/cpython/commit/eb38ee052e2273568d0041e969aa851ee44e43ce
commit: eb38ee052e2273568d0041e969aa851ee44e43ce
branch: 3.6
author: Victor Stinner <vstinner at redhat.com>
committer: GitHub <noreply at github.com>
date: 2018-12-06T01:49:34+01:00
summary:
[3.6] Revert "bpo-34172: multiprocessing.Pool leaks resources after being deleted (GH-8450) (GH-9677)" (GH-10969)
This reverts commit 07b96a95db78eff3557d1bfed1df9ebecc40815b.
files:
A Misc/NEWS.d/next/Library/2018-12-06-00-31-25.bpo-34172.l7CIYt.rst
M Lib/multiprocessing/pool.py
M Lib/test/_test_multiprocessing.py
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index 32254d8ea6cf..a545f3c1a189 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -147,9 +147,8 @@ class Pool(object):
'''
_wrap_exception = True
- @staticmethod
- def Process(ctx, *args, **kwds):
- return ctx.Process(*args, **kwds)
+ def Process(self, *args, **kwds):
+ return self._ctx.Process(*args, **kwds)
def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None, context=None):
@@ -176,15 +175,13 @@ def __init__(self, processes=None, initializer=None, initargs=(),
self._worker_handler = threading.Thread(
target=Pool._handle_workers,
- args=(self._cache, self._taskqueue, self._ctx, self.Process,
- self._processes, self._pool, self._inqueue, self._outqueue,
- self._initializer, self._initargs, self._maxtasksperchild,
- self._wrap_exception)
+ args=(self, )
)
self._worker_handler.daemon = True
self._worker_handler._state = RUN
self._worker_handler.start()
+
self._task_handler = threading.Thread(
target=Pool._handle_tasks,
args=(self._taskqueue, self._quick_put, self._outqueue,
@@ -210,62 +207,43 @@ def __init__(self, processes=None, initializer=None, initargs=(),
exitpriority=15
)
- @staticmethod
- def _join_exited_workers(pool):
+ def _join_exited_workers(self):
"""Cleanup after any worker processes which have exited due to reaching
their specified lifetime. Returns True if any workers were cleaned up.
"""
cleaned = False
- for i in reversed(range(len(pool))):
- worker = pool[i]
+ for i in reversed(range(len(self._pool))):
+ worker = self._pool[i]
if worker.exitcode is not None:
# worker exited
util.debug('cleaning up worker %d' % i)
worker.join()
cleaned = True
- del pool[i]
+ del self._pool[i]
return cleaned
def _repopulate_pool(self):
- return self._repopulate_pool_static(self._ctx, self.Process,
- self._processes,
- self._pool, self._inqueue,
- self._outqueue, self._initializer,
- self._initargs,
- self._maxtasksperchild,
- self._wrap_exception)
-
- @staticmethod
- def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
- outqueue, initializer, initargs,
- maxtasksperchild, wrap_exception):
"""Bring the number of pool processes up to the specified number,
for use after reaping workers which have exited.
"""
- for i in range(processes - len(pool)):
- w = Process(ctx, target=worker,
- args=(inqueue, outqueue,
- initializer,
- initargs, maxtasksperchild,
- wrap_exception)
- )
- pool.append(w)
+ for i in range(self._processes - len(self._pool)):
+ w = self.Process(target=worker,
+ args=(self._inqueue, self._outqueue,
+ self._initializer,
+ self._initargs, self._maxtasksperchild,
+ self._wrap_exception)
+ )
+ self._pool.append(w)
w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True
w.start()
util.debug('added worker')
- @staticmethod
- def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,
- initializer, initargs, maxtasksperchild,
- wrap_exception):
+ def _maintain_pool(self):
"""Clean up any exited workers and start replacements for them.
"""
- if Pool._join_exited_workers(pool):
- Pool._repopulate_pool_static(ctx, Process, processes, pool,
- inqueue, outqueue, initializer,
- initargs, maxtasksperchild,
- wrap_exception)
+ if self._join_exited_workers():
+ self._repopulate_pool()
def _setup_queues(self):
self._inqueue = self._ctx.SimpleQueue()
@@ -418,20 +396,16 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
return result
@staticmethod
- def _handle_workers(cache, taskqueue, ctx, Process, processes, pool,
- inqueue, outqueue, initializer, initargs,
- maxtasksperchild, wrap_exception):
+ def _handle_workers(pool):
thread = threading.current_thread()
# Keep maintaining workers until the cache gets drained, unless the pool
# is terminated.
- while thread._state == RUN or (cache and thread._state != TERMINATE):
- Pool._maintain_pool(ctx, Process, processes, pool, inqueue,
- outqueue, initializer, initargs,
- maxtasksperchild, wrap_exception)
+ while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
+ pool._maintain_pool()
time.sleep(0.1)
# send sentinel to stop workers
- taskqueue.put(None)
+ pool._taskqueue.put(None)
util.debug('worker handler exiting')
@staticmethod
@@ -807,7 +781,7 @@ class ThreadPool(Pool):
_wrap_exception = False
@staticmethod
- def Process(ctx, *args, **kwds):
+ def Process(*args, **kwds):
from .dummy import Process
return Process(*args, **kwds)
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 59f9a2e1e2eb..6cafc2e9cbc0 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -2286,13 +2286,6 @@ def test_release_task_refs(self):
# they were released too.
self.assertEqual(CountedObject.n_instances, 0)
- @support.reap_threads
- def test_del_pool(self):
- p = self.Pool(1)
- wr = weakref.ref(p)
- del p
- gc.collect()
- self.assertIsNone(wr())
def raising():
raise KeyError("key")
diff --git a/Misc/NEWS.d/next/Library/2018-12-06-00-31-25.bpo-34172.l7CIYt.rst b/Misc/NEWS.d/next/Library/2018-12-06-00-31-25.bpo-34172.l7CIYt.rst
new file mode 100644
index 000000000000..e467cc967825
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2018-12-06-00-31-25.bpo-34172.l7CIYt.rst
@@ -0,0 +1,3 @@
+REVERT: Fix a reference issue inside multiprocessing.Pool that caused the
+pool to remain alive if it was deleted without being closed or terminated
+explicitly.
More information about the Python-checkins
mailing list