[Python-checkins] cpython: Fix #8684: make sched.scheduler class thread-safe

giampaolo.rodola python-checkins at python.org
Wed Dec 14 13:34:40 CET 2011


http://hg.python.org/cpython/rev/f5aed0dba844
changeset:   73959:f5aed0dba844
user:        Giampaolo Rodola' <g.rodola at gmail.com>
date:        Wed Dec 14 13:34:26 2011 +0100
summary:
  Fix #8684: make sched.scheduler class thread-safe

files:
  Doc/library/sched.rst |  30 +-------------
  Doc/whatsnew/3.3.rst  |   4 ++
  Lib/sched.py          |  62 +++++++++++++++++-------------
  Misc/NEWS             |   3 +
  4 files changed, 45 insertions(+), 54 deletions(-)


diff --git a/Doc/library/sched.rst b/Doc/library/sched.rst
--- a/Doc/library/sched.rst
+++ b/Doc/library/sched.rst
@@ -27,6 +27,9 @@
 
    .. versionchanged:: 3.3
       *timefunc* and *delayfunc* parameters are optional.
+   .. versionchanged:: 3.3
+      :class:`scheduler` class can be safely used in multi-threaded
+      environments.
 
 Example::
 
@@ -47,33 +50,6 @@
    From print_time 930343700.273
    930343700.276
 
-In multi-threaded environments, the :class:`scheduler` class has limitations
-with respect to thread-safety, inability to insert a new task before
-the one currently pending in a running scheduler, and holding up the main
-thread until the event queue is empty.  Instead, the preferred approach
-is to use the :class:`threading.Timer` class instead.
-
-Example::
-
-    >>> import time
-    >>> from threading import Timer
-    >>> def print_time():
-    ...     print("From print_time", time.time())
-    ...
-    >>> def print_some_times():
-    ...     print(time.time())
-    ...     Timer(5, print_time, ()).start()
-    ...     Timer(10, print_time, ()).start()
-    ...     time.sleep(11)  # sleep while time-delay events execute
-    ...     print(time.time())
-    ...
-    >>> print_some_times()
-    930343690.257
-    From print_time 930343695.274
-    From print_time 930343700.273
-    930343701.301
-
-
 .. _scheduler-objects:
 
 Scheduler Objects
diff --git a/Doc/whatsnew/3.3.rst b/Doc/whatsnew/3.3.rst
--- a/Doc/whatsnew/3.3.rst
+++ b/Doc/whatsnew/3.3.rst
@@ -662,6 +662,10 @@
 sched
 -----
 
+* :class:`~sched.scheduler` class can now be safely used in multi-threaded
+  environments.  (Contributed by Josiah Carlson and Giampaolo Rodolà in
+  :issue:`8684`)
+
 * *timefunc* and *delayfunct* parameters of :class:`~sched.scheduler` class
   constructor are now optional and defaults to :func:`time.time` and
   :func:`time.sleep` respectively.  (Contributed by Chris Clark in
diff --git a/Lib/sched.py b/Lib/sched.py
--- a/Lib/sched.py
+++ b/Lib/sched.py
@@ -30,6 +30,7 @@
 
 import time
 import heapq
+import threading
 from collections import namedtuple
 
 __all__ = ["scheduler"]
@@ -48,6 +49,7 @@
         """Initialize a new instance, passing the time and delay
         functions"""
         self._queue = []
+        self._lock = threading.RLock()
         self.timefunc = timefunc
         self.delayfunc = delayfunc
 
@@ -58,9 +60,10 @@
         if necessary.
 
         """
-        event = Event(time, priority, action, argument, kwargs)
-        heapq.heappush(self._queue, event)
-        return event # The ID
+        with self._lock:
+            event = Event(time, priority, action, argument, kwargs)
+            heapq.heappush(self._queue, event)
+            return event # The ID
 
     def enter(self, delay, priority, action, argument=[], kwargs={}):
         """A variant that specifies the time as a relative time.
@@ -68,8 +71,9 @@
         This is actually the more commonly used interface.
 
         """
-        time = self.timefunc() + delay
-        return self.enterabs(time, priority, action, argument, kwargs)
+        with self._lock:
+            time = self.timefunc() + delay
+            return self.enterabs(time, priority, action, argument, kwargs)
 
     def cancel(self, event):
         """Remove an event from the queue.
@@ -78,12 +82,14 @@
         If the event is not in the queue, this raises ValueError.
 
         """
-        self._queue.remove(event)
-        heapq.heapify(self._queue)
+        with self._lock:
+            self._queue.remove(event)
+            heapq.heapify(self._queue)
 
     def empty(self):
         """Check whether the queue is empty."""
-        return not self._queue
+        with self._lock:
+            return not self._queue
 
     def run(self):
         """Execute events until the queue is empty.
@@ -108,24 +114,25 @@
         """
         # localize variable access to minimize overhead
         # and to improve thread safety
-        q = self._queue
-        delayfunc = self.delayfunc
-        timefunc = self.timefunc
-        pop = heapq.heappop
-        while q:
-            time, priority, action, argument, kwargs = checked_event = q[0]
-            now = timefunc()
-            if now < time:
-                delayfunc(time - now)
-            else:
-                event = pop(q)
-                # Verify that the event was not removed or altered
-                # by another thread after we last looked at q[0].
-                if event is checked_event:
-                    action(*argument, **kwargs)
-                    delayfunc(0)   # Let other threads run
+        with self._lock:
+            q = self._queue
+            delayfunc = self.delayfunc
+            timefunc = self.timefunc
+            pop = heapq.heappop
+            while q:
+                time, priority, action, argument, kwargs = checked_event = q[0]
+                now = timefunc()
+                if now < time:
+                    delayfunc(time - now)
                 else:
-                    heapq.heappush(q, event)
+                    event = pop(q)
+                    # Verify that the event was not removed or altered
+                    # by another thread after we last looked at q[0].
+                    if event is checked_event:
+                        action(*argument, **kwargs)
+                        delayfunc(0)   # Let other threads run
+                    else:
+                        heapq.heappush(q, event)
 
     @property
     def queue(self):
@@ -138,5 +145,6 @@
         # Use heapq to sort the queue rather than using 'sorted(self._queue)'.
         # With heapq, two events scheduled at the same time will show in
         # the actual order they would be retrieved.
-        events = self._queue[:]
-        return map(heapq.heappop, [events]*len(events))
+        with self._lock:
+            events = self._queue[:]
+            return map(heapq.heappop, [events]*len(events))
diff --git a/Misc/NEWS b/Misc/NEWS
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -409,6 +409,9 @@
 Library
 -------
 
+- Issue #8684 sched.scheduler class can be safely used in multi-threaded
+  environments.
+
 - Alias resource.error to OSError ala PEP 3151.
 
 - Issue #5689: Add support for lzma compression to the tarfile module.

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list