[Python-checkins] bpo-27144: concurrent.futures as_complete and map iterators do not keep reference to returned object (#1560)

Antoine Pitrou webhook-mailer at python.org
Fri Sep 1 12:54:03 EDT 2017


https://github.com/python/cpython/commit/97e1b1c81458d2109b2ffed32ffa1eb643a6c3b9
commit: 97e1b1c81458d2109b2ffed32ffa1eb643a6c3b9
branch: master
author: Grzegorz Grzywacz <grzgrzgrz3 at gmail.com>
committer: Antoine Pitrou <pitrou at free.fr>
date: 2017-09-01T18:54:00+02:00
summary:

bpo-27144: concurrent.futures as_complete and map iterators do not keep reference to returned object (#1560)

* bpo-27144: concurrent.futures as_complie and map iterators do not keep
reference to returned object

* Some nits.  Improve wordings in docstrings and comments, and avoid relying on
sys.getrefcount() in tests.

files:
A Misc/NEWS.d/next/Library/2017-08-30-11-26-14.bpo-27144.PEDJsE.rst
M Lib/concurrent/futures/_base.py
M Lib/concurrent/futures/process.py
M Lib/test/test_concurrent_futures.py

diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py
index 295489c93e5..88521ae317e 100644
--- a/Lib/concurrent/futures/_base.py
+++ b/Lib/concurrent/futures/_base.py
@@ -170,6 +170,20 @@ def _create_and_install_waiters(fs, return_when):
 
     return waiter
 
+
+def _yield_and_decref(fs, ref_collect):
+    """
+    Iterate on the list *fs*, yielding objects one by one in reverse order.
+    Before yielding an object, it is removed from each set in
+    the collection of sets *ref_collect*.
+    """
+    while fs:
+        for futures_set in ref_collect:
+            futures_set.remove(fs[-1])
+        # Careful not to keep a reference to the popped value
+        yield fs.pop()
+
+
 def as_completed(fs, timeout=None):
     """An iterator over the given futures that yields each as it completes.
 
@@ -191,6 +205,8 @@ def as_completed(fs, timeout=None):
     if timeout is not None:
         end_time = timeout + time.time()
 
+    total_futures = len(fs)
+
     fs = set(fs)
     with _AcquireFutures(fs):
         finished = set(
@@ -198,9 +214,9 @@ def as_completed(fs, timeout=None):
                 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
         pending = fs - finished
         waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
-
+    finished = list(finished)
     try:
-        yield from finished
+        yield from _yield_and_decref(finished, ref_collect=(fs,))
 
         while pending:
             if timeout is None:
@@ -210,7 +226,7 @@ def as_completed(fs, timeout=None):
                 if wait_timeout < 0:
                     raise TimeoutError(
                             '%d (of %d) futures unfinished' % (
-                            len(pending), len(fs)))
+                            len(pending), total_futures))
 
             waiter.event.wait(wait_timeout)
 
@@ -219,9 +235,9 @@ def as_completed(fs, timeout=None):
                 waiter.finished_futures = []
                 waiter.event.clear()
 
-            for future in finished:
-                yield future
-                pending.remove(future)
+            # reverse to keep finishing order
+            finished.reverse()
+            yield from _yield_and_decref(finished, ref_collect=(fs, pending))
 
     finally:
         for f in fs:
@@ -551,11 +567,14 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
         # before the first iterator value is required.
         def result_iterator():
             try:
-                for future in fs:
+                # reverse to keep finishing order
+                fs.reverse()
+                while fs:
+                    # Careful not to keep a reference to the popped future
                     if timeout is None:
-                        yield future.result()
+                        yield fs.pop().result()
                     else:
-                        yield future.result(end_time - time.time())
+                        yield fs.pop().result(end_time - time.time())
             finally:
                 for future in fs:
                     future.cancel()
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 8f1d714193a..03b28ab5d68 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -357,6 +357,18 @@ def _check_system_limits():
     raise NotImplementedError(_system_limited)
 
 
+def _chain_from_iterable_of_lists(iterable):
+    """
+    Specialized implementation of itertools.chain.from_iterable.
+    Each item in *iterable* should be a list.  This function is
+    careful not to keep references to yielded objects.
+    """
+    for element in iterable:
+        element.reverse()
+        while element:
+            yield element.pop()
+
+
 class BrokenProcessPool(RuntimeError):
     """
     Raised when a process in a ProcessPoolExecutor terminated abruptly
@@ -482,7 +494,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
         results = super().map(partial(_process_chunk, fn),
                               _get_chunks(*iterables, chunksize=chunksize),
                               timeout=timeout)
-        return itertools.chain.from_iterable(results)
+        return _chain_from_iterable_of_lists(results)
 
     def shutdown(self, wait=True):
         with self._shutdown_lock:
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index ebc30a49e5e..f1226fe7090 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -59,6 +59,10 @@ def my_method(self):
         pass
 
 
+def make_dummy_object(_):
+    return MyObject()
+
+
 class BaseTestCase(unittest.TestCase):
     def setUp(self):
         self._thread_key = test.support.threading_setup()
@@ -396,6 +400,38 @@ def test_duplicate_futures(self):
         completed = [f for f in futures.as_completed([future1,future1])]
         self.assertEqual(len(completed), 1)
 
+    def test_free_reference_yielded_future(self):
+        # Issue #14406: Generator should not keep references
+        # to finished futures.
+        futures_list = [Future() for _ in range(8)]
+        futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
+        futures_list.append(create_future(state=SUCCESSFUL_FUTURE))
+
+        with self.assertRaises(futures.TimeoutError):
+            for future in futures.as_completed(futures_list, timeout=0):
+                futures_list.remove(future)
+                wr = weakref.ref(future)
+                del future
+                self.assertIsNone(wr())
+
+        futures_list[0].set_result("test")
+        for future in futures.as_completed(futures_list):
+            futures_list.remove(future)
+            wr = weakref.ref(future)
+            del future
+            self.assertIsNone(wr())
+            if futures_list:
+                futures_list[0].set_result("test")
+
+    def test_correct_timeout_exception_msg(self):
+        futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
+                        RUNNING_FUTURE, SUCCESSFUL_FUTURE]
+
+        with self.assertRaises(futures.TimeoutError) as cm:
+            list(futures.as_completed(futures_list, timeout=0))
+
+        self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
+
 
 class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, BaseTestCase):
     pass
@@ -421,6 +457,10 @@ def test_map(self):
                 list(self.executor.map(pow, range(10), range(10))),
                 list(map(pow, range(10), range(10))))
 
+        self.assertEqual(
+                list(self.executor.map(pow, range(10), range(10), chunksize=3)),
+                list(map(pow, range(10), range(10))))
+
     def test_map_exception(self):
         i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
         self.assertEqual(i.__next__(), (0, 1))
@@ -471,6 +511,14 @@ def test_max_workers_negative(self):
                                         "than 0"):
                 self.executor_type(max_workers=number)
 
+    def test_free_reference(self):
+        # Issue #14406: Result iterator should not keep an internal
+        # reference to result objects.
+        for obj in self.executor.map(make_dummy_object, range(10)):
+            wr = weakref.ref(obj)
+            del obj
+            self.assertIsNone(wr())
+
 
 class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase):
     def test_map_submits_without_iteration(self):
diff --git a/Misc/NEWS.d/next/Library/2017-08-30-11-26-14.bpo-27144.PEDJsE.rst b/Misc/NEWS.d/next/Library/2017-08-30-11-26-14.bpo-27144.PEDJsE.rst
new file mode 100644
index 00000000000..857fad0c852
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2017-08-30-11-26-14.bpo-27144.PEDJsE.rst
@@ -0,0 +1,2 @@
+The ``map()`` and ``as_completed()`` iterators in ``concurrent.futures``
+now avoid keeping a reference to yielded objects.
\ No newline at end of file



More information about the Python-checkins mailing list