[Python-checkins] cpython: Issue #16284: Prevent keeping unnecessary references to worker functions in

andrew.svetlov python-checkins at python.org
Sat Nov 3 14:36:12 CET 2012


http://hg.python.org/cpython/rev/70cef0a160cf
changeset:   80188:70cef0a160cf
user:        Andrew Svetlov <andrew.svetlov at gmail.com>
date:        Sat Nov 03 15:36:01 2012 +0200
summary:
  Issue #16284: Prevent keeping unnecessary references to worker functions in concurrent.futures ThreadPoolExecutor.

files:
  Lib/concurrent/futures/process.py   |   4 +++
  Lib/concurrent/futures/thread.py    |   2 +
  Lib/multiprocessing/queues.py       |   4 +++
  Lib/test/test_concurrent_futures.py |  22 +++++++++++++++++
  Misc/ACKS                           |   1 +
  Misc/NEWS                           |   3 ++
  6 files changed, 36 insertions(+), 0 deletions(-)


diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -240,6 +240,8 @@
                         "terminated abruptly while the future was "
                         "running or pending."
                     ))
+                # Delete references to object. See issue16284
+                del work_item
             pending_work_items.clear()
             # Terminate remaining workers forcibly: the queues or their
             # locks may be in a dirty state and block forever.
@@ -264,6 +266,8 @@
                     work_item.future.set_exception(result_item.exception)
                 else:
                     work_item.future.set_result(result_item.result)
+                # Delete references to object. See issue16284
+                del work_item
         # Check whether we should start shutting down.
         executor = executor_reference()
         # No more work items can be added if:
diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py
--- a/Lib/concurrent/futures/thread.py
+++ b/Lib/concurrent/futures/thread.py
@@ -63,6 +63,8 @@
             work_item = work_queue.get(block=True)
             if work_item is not None:
                 work_item.run()
+                # Delete references to object. See issue16284
+                del work_item
                 continue
             executor = executor_reference()
             # Exit if:
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -243,10 +243,14 @@
 
                         if wacquire is None:
                             send(obj)
+                            # Delete references to object. See issue16284
+                            del obj
                         else:
                             wacquire()
                             try:
                                 send(obj)
+                                # Delete references to object. See issue16284
+                                del obj
                             finally:
                                 wrelease()
                 except IndexError:
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -15,6 +15,7 @@
 import threading
 import time
 import unittest
+import weakref
 
 from concurrent import futures
 from concurrent.futures._base import (
@@ -52,6 +53,11 @@
     sys.stdout.flush()
 
 
+class MyObject(object):
+    def my_method(self):
+        pass
+
+
 class ExecutorMixin:
     worker_count = 5
 
@@ -396,6 +402,22 @@
         self.executor.map(str, [2] * (self.worker_count + 1))
         self.executor.shutdown()
 
+    @test.support.cpython_only
+    def test_no_stale_references(self):
+        # Issue #16284: check that the executors don't unnecessarily hang onto
+        # references.
+        my_object = MyObject()
+        my_object_collected = threading.Event()
+        my_object_callback = weakref.ref(
+            my_object, lambda obj: my_object_collected.set())
+        # Deliberately discarding the future.
+        self.executor.submit(my_object.my_method)
+        del my_object
+
+        collected = my_object_collected.wait(timeout=5.0)
+        self.assertTrue(collected,
+                        "Stale reference not collected within timeout.")
+
 
 class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
     def test_map_submits_without_iteration(self):
diff --git a/Misc/ACKS b/Misc/ACKS
--- a/Misc/ACKS
+++ b/Misc/ACKS
@@ -726,6 +726,7 @@
 Loren Luke
 Fredrik Lundh
 Mark Lutz
+Taras Lyapun
 Jim Lynch
 Mikael Lyngvig
 Martin von Löwis
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -88,6 +88,9 @@
 Library
 -------
 
+- Issue #16284: Prevent keeping unnecessary references to worker functions
+  in concurrent.futures ThreadPoolExecutor.
+
 - Issue #1207589: Add Cut/Copy/Paste items to IDLE right click Context Menu
   Patch by Todd Rovito.
 

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list