[Python-checkins] cpython: Issue #14087: multiprocessing: add Condition.wait_for(). Patch by sbt.

charles-francois.natali python-checkins at python.org
Tue Apr 17 18:46:44 CEST 2012


http://hg.python.org/cpython/rev/5606ee052783
changeset:   76375:5606ee052783
user:        Charles-François Natali <neologix at free.fr>
date:        Tue Apr 17 18:45:57 2012 +0200
summary:
  Issue #14087: multiprocessing: add Condition.wait_for(). Patch by sbt.

files:
  Doc/library/multiprocessing.rst    |   6 +
  Lib/multiprocessing/managers.py    |  19 +++++
  Lib/multiprocessing/synchronize.py |  19 +++++
  Lib/test/test_multiprocessing.py   |  67 ++++++++++++++++++
  Misc/NEWS                          |   2 +
  5 files changed, 113 insertions(+), 0 deletions(-)


diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst
--- a/Doc/library/multiprocessing.rst
+++ b/Doc/library/multiprocessing.rst
@@ -897,6 +897,9 @@
    If *lock* is specified then it should be a :class:`Lock` or :class:`RLock`
    object from :mod:`multiprocessing`.
 
+   .. versionchanged:: 3.3
+      The :meth:`wait_for` method was added.
+
 .. class:: Event()
 
    A clone of :class:`threading.Event`.
@@ -1281,6 +1284,9 @@
       If *lock* is supplied then it should be a proxy for a
       :class:`threading.Lock` or :class:`threading.RLock` object.
 
+      .. versionchanged:: 3.3
+         The :meth:`wait_for` method was added.
+
    .. method:: Event()
 
       Create a shared :class:`threading.Event` object and return a proxy for it.
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -48,6 +48,7 @@
 from multiprocessing import Process, current_process, active_children, Pool, util, connection
 from multiprocessing.process import AuthenticationString
 from multiprocessing.forking import exit, Popen, ForkingPickler
+from time import time as _time
 
 #
 # Register some things for pickling
@@ -996,6 +997,24 @@
         return self._callmethod('notify')
     def notify_all(self):
         return self._callmethod('notify_all')
+    def wait_for(self, predicate, timeout=None):
+        result = predicate()
+        if result:
+            return result
+        if timeout is not None:
+            endtime = _time() + timeout
+        else:
+            endtime = None
+            waittime = None
+        while not result:
+            if endtime is not None:
+                waittime = endtime - _time()
+                if waittime <= 0:
+                    break
+            self.wait(waittime)
+            result = predicate()
+        return result
+
 
 class EventProxy(BaseProxy):
     _exposed_ = ('is_set', 'set', 'clear', 'wait')
diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py
--- a/Lib/multiprocessing/synchronize.py
+++ b/Lib/multiprocessing/synchronize.py
@@ -43,6 +43,7 @@
 from multiprocessing.process import current_process
 from multiprocessing.util import register_after_fork, debug
 from multiprocessing.forking import assert_spawning, Popen
+from time import time as _time
 
 # Try to import the mp.synchronize module cleanly, if it fails
 # raise ImportError for platforms lacking a working sem_open implementation.
@@ -290,6 +291,24 @@
             while self._wait_semaphore.acquire(False):
                 pass
 
+    def wait_for(self, predicate, timeout=None):
+        result = predicate()
+        if result:
+            return result
+        if timeout is not None:
+            endtime = _time() + timeout
+        else:
+            endtime = None
+            waittime = None
+        while not result:
+            if endtime is not None:
+                waittime = endtime - _time()
+                if waittime <= 0:
+                    break
+            self.wait(waittime)
+            result = predicate()
+        return result
+
 #
 # Event
 #
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -887,6 +887,73 @@
         self.assertEqual(res, False)
         self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
 
+    @classmethod
+    def _test_waitfor_f(cls, cond, state):
+        with cond:
+            state.value = 0
+            cond.notify()
+            result = cond.wait_for(lambda : state.value==4)
+            if not result or state.value != 4:
+                sys.exit(1)
+
+    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
+    def test_waitfor(self):
+        # based on test in test/lock_tests.py
+        cond = self.Condition()
+        state = self.Value('i', -1)
+
+        p = self.Process(target=self._test_waitfor_f, args=(cond, state))
+        p.daemon = True
+        p.start()
+
+        with cond:
+            result = cond.wait_for(lambda : state.value==0)
+            self.assertTrue(result)
+            self.assertEqual(state.value, 0)
+
+        for i in range(4):
+            time.sleep(0.01)
+            with cond:
+                state.value += 1
+                cond.notify()
+
+        p.join(5)
+        self.assertFalse(p.is_alive())
+        self.assertEqual(p.exitcode, 0)
+
+    @classmethod
+    def _test_waitfor_timeout_f(cls, cond, state, success):
+        with cond:
+            expected = 0.1
+            dt = time.time()
+            result = cond.wait_for(lambda : state.value==4, timeout=expected)
+            dt = time.time() - dt
+            # borrow logic in assertTimeout() from test/lock_tests.py
+            if not result and expected * 0.6 < dt < expected * 10.0:
+                success.value = True
+
+    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
+    def test_waitfor_timeout(self):
+        # based on test in test/lock_tests.py
+        cond = self.Condition()
+        state = self.Value('i', 0)
+        success = self.Value('i', False)
+
+        p = self.Process(target=self._test_waitfor_timeout_f,
+                         args=(cond, state, success))
+        p.daemon = True
+        p.start()
+
+        # Only increment 3 times, so state == 4 is never reached.
+        for i in range(3):
+            time.sleep(0.01)
+            with cond:
+                state.value += 1
+                cond.notify()
+
+        p.join(5)
+        self.assertTrue(success.value)
+
 
 class _TestEvent(BaseTestCase):
 
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -39,6 +39,8 @@
 Library
 -------
 
+- Issue #14087: multiprocessing: add Condition.wait_for(). Patch by sbt.
+
 - Issue #14452: SysLogHandler no longer inserts a UTF-8 BOM into the message.
 
 - Issue #14386: Expose the dict_proxy internal type as types.MappingProxyType.

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list