[Python-checkins] bpo-33097: Fix submit accepting callable after executor shutdown by interpreter exit (GH-6144) (GH-6445)
Antoine Pitrou
webhook-mailer at python.org
Tue Apr 10 14:35:12 EDT 2018
https://github.com/python/cpython/commit/b26265900a18a184997c3c3a1fa6a5bf29703ec9
commit: b26265900a18a184997c3c3a1fa6a5bf29703ec9
branch: 3.7
author: Miss Islington (bot) <31488909+miss-islington at users.noreply.github.com>
committer: Antoine Pitrou <pitrou at free.fr>
date: 2018-04-10T20:35:02+02:00
summary:
bpo-33097: Fix submit accepting callable after executor shutdown by interpreter exit (GH-6144) (GH-6445)
Executors in concurrent.futures accepted tasks after executor was shutdown by interpreter exit. Tasks were left in PENDING state forever. This fix changes submit to instead raise a RuntimeError.
(cherry picked from commit c4b695f85e141f57d22d8edf7bc2c756da136918)
Co-authored-by: Mark Nemec <mrknmc at me.com>
files:
A Misc/NEWS.d/next/Library/2018-03-18-16-48-23.bpo-33097.Yl4gI2.rst
M Lib/concurrent/futures/process.py
M Lib/concurrent/futures/thread.py
M Lib/test/test_concurrent_futures.py
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 63f22cfca325..ce7d642b098a 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -423,6 +423,10 @@ def shutdown_worker():
# - The executor that owns this worker has been shutdown.
if shutting_down():
try:
+ # Flag the executor as shutting down as early as possible if it
+ # is not gc-ed yet.
+ if executor is not None:
+ executor._shutdown_thread = True
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
@@ -595,6 +599,9 @@ def submit(self, fn, *args, **kwargs):
raise BrokenProcessPool(self._broken)
if self._shutdown_thread:
raise RuntimeError('cannot schedule new futures after shutdown')
+ if _global_shutdown:
+ raise RuntimeError('cannot schedule new futures after '
+ 'interpreter shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py
index 6e22950a157d..b65dee11f727 100644
--- a/Lib/concurrent/futures/thread.py
+++ b/Lib/concurrent/futures/thread.py
@@ -87,6 +87,10 @@ def _worker(executor_reference, work_queue, initializer, initargs):
# - The executor that owns the worker has been collected OR
# - The executor that owns the worker has been shutdown.
if _shutdown or executor is None or executor._shutdown:
+ # Flag the executor as shutting down as early as possible if it
+ # is not gc-ed yet.
+ if executor is not None:
+ executor._shutdown = True
# Notice other workers
work_queue.put(None)
return
@@ -145,6 +149,9 @@ def submit(self, fn, *args, **kwargs):
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
+ if _shutdown:
+ raise RuntimeError('cannot schedule new futures after'
+ 'interpreter shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index 18d0265f3f61..b258a0eafde6 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -303,6 +303,34 @@ def test_interpreter_shutdown(self):
self.assertFalse(err)
self.assertEqual(out.strip(), b"apple")
+ def test_submit_after_interpreter_shutdown(self):
+ # Test the atexit hook for shutdown of worker threads and processes
+ rc, out, err = assert_python_ok('-c', """if 1:
+ import atexit
+ @atexit.register
+ def run_last():
+ try:
+ t.submit(id, None)
+ except RuntimeError:
+ print("runtime-error")
+ raise
+ from concurrent.futures import {executor_type}
+ if __name__ == "__main__":
+ context = '{context}'
+ if not context:
+ t = {executor_type}(5)
+ else:
+ from multiprocessing import get_context
+ context = get_context(context)
+ t = {executor_type}(5, mp_context=context)
+ t.submit(id, 42).result()
+ """.format(executor_type=self.executor_type.__name__,
+ context=getattr(self, "ctx", "")))
+ # Errors in atexit hooks don't change the process exit code, check
+ # stderr manually.
+ self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
+ self.assertEqual(out.strip(), b"runtime-error")
+
def test_hang_issue12364(self):
fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
self.executor.shutdown()
diff --git a/Misc/NEWS.d/next/Library/2018-03-18-16-48-23.bpo-33097.Yl4gI2.rst b/Misc/NEWS.d/next/Library/2018-03-18-16-48-23.bpo-33097.Yl4gI2.rst
new file mode 100644
index 000000000000..d9411eb51623
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2018-03-18-16-48-23.bpo-33097.Yl4gI2.rst
@@ -0,0 +1,2 @@
+Raise RuntimeError when ``executor.submit`` is called during interpreter
+shutdown.
More information about the Python-checkins
mailing list