[Python-checkins] bpo-39812: Remove daemon threads in concurrent.futures (GH-19149)

Kyle Stanley webhook-mailer at python.org
Fri Mar 27 15:31:30 EDT 2020


https://github.com/python/cpython/commit/b61b818d916942aad1f8f3e33181801c4a1ed14b
commit: b61b818d916942aad1f8f3e33181801c4a1ed14b
branch: master
author: Kyle Stanley <aeros167 at gmail.com>
committer: GitHub <noreply at github.com>
date: 2020-03-27T20:31:22+01:00
summary:

bpo-39812: Remove daemon threads in concurrent.futures (GH-19149)

Remove daemon threads from :mod:`concurrent.futures` by adding
an internal `threading._register_atexit()`, which calls registered functions
prior to joining all non-daemon threads. This allows for compatibility
with subinterpreters, which don't support daemon threads.

files:
A Misc/NEWS.d/next/Library/2020-03-25-00-35-48.bpo-39812.rIKnms.rst
M Doc/whatsnew/3.9.rst
M Lib/concurrent/futures/process.py
M Lib/concurrent/futures/thread.py
M Lib/test/test_threading.py
M Lib/threading.py

diff --git a/Doc/whatsnew/3.9.rst b/Doc/whatsnew/3.9.rst
index 778e443f8d077..a76445b3d8cbb 100644
--- a/Doc/whatsnew/3.9.rst
+++ b/Doc/whatsnew/3.9.rst
@@ -195,6 +195,11 @@ which have not started running, instead of waiting for them to complete before
 shutting down the executor.
 (Contributed by Kyle Stanley in :issue:`39349`.)
 
+Removed daemon threads from :class:`~concurrent.futures.ThreadPoolExecutor`
+and :class:`~concurrent.futures.ProcessPoolExecutor`. This improves
+compatibility with subinterpreters and predictability in their shutdown
+processes. (Contributed by Kyle Stanley in :issue:`39812`.)
+
 curses
 ------
 
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 39fadcce027c2..4c39500d675ff 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -59,19 +59,6 @@
 import sys
 import traceback
 
-# Workers are created as daemon threads and processes. This is done to allow the
-# interpreter to exit when there are still idle processes in a
-# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
-# allowing workers to die with the interpreter has two undesirable properties:
-#   - The workers would still be running during interpreter shutdown,
-#     meaning that they would fail in unpredictable ways.
-#   - The workers could be killed while evaluating a work item, which could
-#     be bad if the callable being evaluated has external side-effects e.g.
-#     writing to a file.
-#
-# To work around this problem, an exit handler is installed which tells the
-# workers to exit when their work queues are empty and then waits until the
-# threads/processes finish.
 
 _threads_wakeups = weakref.WeakKeyDictionary()
 _global_shutdown = False
@@ -107,6 +94,12 @@ def _python_exit():
     for t, _ in items:
         t.join()
 
+# Register for `_python_exit()` to be called just before joining all
+# non-daemon threads. This is used instead of `atexit.register()` for
+# compatibility with subinterpreters, which no longer support daemon threads.
+# See bpo-39812 for context.
+threading._register_atexit(_python_exit)
+
 # Controls how many more calls than processes will be queued in the call queue.
 # A smaller number will mean that processes spend more time idle waiting for
 # work while a larger number will make Future.cancel() succeed less frequently
@@ -306,9 +299,7 @@ def weakref_cb(_, thread_wakeup=self.thread_wakeup):
         #     {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
         self.pending_work_items = executor._pending_work_items
 
-        # Set this thread to be daemonized
         super().__init__()
-        self.daemon = True
 
     def run(self):
         # Main loop for the executor manager thread.
@@ -732,5 +723,3 @@ def shutdown(self, wait=True, *, cancel_futures=False):
             self._executor_manager_thread_wakeup = None
 
     shutdown.__doc__ = _base.Executor.shutdown.__doc__
-
-atexit.register(_python_exit)
diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py
index be79161bf8561..2aa4e17d47fa7 100644
--- a/Lib/concurrent/futures/thread.py
+++ b/Lib/concurrent/futures/thread.py
@@ -13,19 +13,6 @@
 import weakref
 import os
 
-# Workers are created as daemon threads. This is done to allow the interpreter
-# to exit when there are still idle threads in a ThreadPoolExecutor's thread
-# pool (i.e. shutdown() was not called). However, allowing workers to die with
-# the interpreter has two undesirable properties:
-#   - The workers would still be running during interpreter shutdown,
-#     meaning that they would fail in unpredictable ways.
-#   - The workers could be killed while evaluating a work item, which could
-#     be bad if the callable being evaluated has external side-effects e.g.
-#     writing to a file.
-#
-# To work around this problem, an exit handler is installed which tells the
-# workers to exit when their work queues are empty and then waits until the
-# threads finish.
 
 _threads_queues = weakref.WeakKeyDictionary()
 _shutdown = False
@@ -43,7 +30,11 @@ def _python_exit():
     for t, q in items:
         t.join()
 
-atexit.register(_python_exit)
+# Register for `_python_exit()` to be called just before joining all
+# non-daemon threads. This is used instead of `atexit.register()` for
+# compatibility with subinterpreters, which no longer support daemon threads.
+# See bpo-39812 for context.
+threading._register_atexit(_python_exit)
 
 
 class _WorkItem(object):
@@ -197,7 +188,6 @@ def weakref_cb(_, q=self._work_queue):
                                        self._work_queue,
                                        self._initializer,
                                        self._initargs))
-            t.daemon = True
             t.start()
             self._threads.add(t)
             _threads_queues[t] = self._work_queue
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index f1037b5d940b0..da17e1281d986 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -1397,5 +1397,55 @@ def test_interrupt_main_noerror(self):
             signal.signal(signal.SIGINT, handler)
 
 
+class AtexitTests(unittest.TestCase):
+
+    def test_atexit_output(self):
+        rc, out, err = assert_python_ok("-c", """if True:
+            import threading
+
+            def run_last():
+                print('parrot')
+
+            threading._register_atexit(run_last)
+        """)
+
+        self.assertFalse(err)
+        self.assertEqual(out.strip(), b'parrot')
+
+    def test_atexit_called_once(self):
+        rc, out, err = assert_python_ok("-c", """if True:
+            import threading
+            from unittest.mock import Mock
+
+            mock = Mock()
+            threading._register_atexit(mock)
+            mock.assert_not_called()
+            # force early shutdown to ensure it was called once
+            threading._shutdown()
+            mock.assert_called_once()
+        """)
+
+        self.assertFalse(err)
+
+    def test_atexit_after_shutdown(self):
+        # The only way to do this is by registering an atexit within
+        # an atexit, which is intended to raise an exception.
+        rc, out, err = assert_python_ok("-c", """if True:
+            import threading
+
+            def func():
+                pass
+
+            def run_last():
+                threading._register_atexit(func)
+
+            threading._register_atexit(run_last)
+        """)
+
+        self.assertTrue(err)
+        self.assertIn("RuntimeError: can't register atexit after shutdown",
+                err.decode())
+
+
 if __name__ == "__main__":
     unittest.main()
diff --git a/Lib/threading.py b/Lib/threading.py
index 46eb1b918a9a7..6b25e7a26ed2a 100644
--- a/Lib/threading.py
+++ b/Lib/threading.py
@@ -3,6 +3,7 @@
 import os as _os
 import sys as _sys
 import _thread
+import functools
 
 from time import monotonic as _time
 from _weakrefset import WeakSet
@@ -1346,6 +1347,27 @@ def enumerate():
     with _active_limbo_lock:
         return list(_active.values()) + list(_limbo.values())
 
+
+_threading_atexits = []
+_SHUTTING_DOWN = False
+
+def _register_atexit(func, *arg, **kwargs):
+    """CPython internal: register *func* to be called before joining threads.
+
+    The registered *func* is called with its arguments just before all
+    non-daemon threads are joined in `_shutdown()`. It provides a similar
+    purpose to `atexit.register()`, but its functions are called prior to
+    threading shutdown instead of interpreter shutdown.
+
+    For similarity to atexit, the registered functions are called in reverse.
+    """
+    if _SHUTTING_DOWN:
+        raise RuntimeError("can't register atexit after shutdown")
+
+    call = functools.partial(func, *arg, **kwargs)
+    _threading_atexits.append(call)
+
+
 from _thread import stack_size
 
 # Create the main thread object,
@@ -1367,6 +1389,8 @@ def _shutdown():
         # _shutdown() was already called
         return
 
+    global _SHUTTING_DOWN
+    _SHUTTING_DOWN = True
     # Main thread
     tlock = _main_thread._tstate_lock
     # The main thread isn't finished yet, so its thread state lock can't have
@@ -1376,6 +1400,11 @@ def _shutdown():
     tlock.release()
     _main_thread._stop()
 
+    # Call registered threading atexit functions before threads are joined.
+    # Order is reversed, similar to atexit.
+    for atexit_call in reversed(_threading_atexits):
+        atexit_call()
+
     # Join all non-deamon threads
     while True:
         with _shutdown_locks_lock:
diff --git a/Misc/NEWS.d/next/Library/2020-03-25-00-35-48.bpo-39812.rIKnms.rst b/Misc/NEWS.d/next/Library/2020-03-25-00-35-48.bpo-39812.rIKnms.rst
new file mode 100644
index 0000000000000..4cea878d0ccb4
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2020-03-25-00-35-48.bpo-39812.rIKnms.rst
@@ -0,0 +1,4 @@
+Removed daemon threads from :mod:`concurrent.futures` by adding
+an internal `threading._register_atexit()`, which calls registered functions
+prior to joining all non-daemon threads. This allows for compatibility
+with subinterpreters, which don't support daemon threads.
\ No newline at end of file



More information about the Python-checkins mailing list