[issue995907] memory leak with threads and enhancement of the timer class

Charles-François Natali report at bugs.python.org
Fri May 10 19:08:21 CEST 2013


Charles-François Natali added the comment:

I'm attaching a proof of concept code for a ScheduledExecutor
interface, and a ScheduledThreadPoolExecutor implementation
(unfortunately I can't upload it as a mercurial diff for now).

Here's what the API looks like:

"""
from concurrent.futures import ScheduledThreadPoolExecutor
import time

def say(text):
    print("{}: {}".format(time.ctime(), text))

with ScheduledThreadPoolExecutor(5) as p:
    p.schedule(1, say, 'hello 1')
    f = p.schedule_fixed_rate(0, 2, say, 'hello 2')
    p.schedule_fixed_delay(0, 3, say, 'hello 3')
    time.sleep(6)
    say("cancelling: %s" % f)
    f.cancel()
    time.sleep(10)
    say("shutting down")
"""

schedule() is for one-shot, schedule_fixed_rate() for fixed rate
scheduling (i.e. there will be no drift due to the task execution
time), and schedule_fixed_delay() is for fixed delay (i.e. there will
always be a fixed amount of time between two invokations).

Random notes:
- the scheduling is handled by a new SchedQueue in the queue module:
sched would have been useful, but actually it can't be used here: it
stops as soon as the queue is empty, when it calls the wait function
it won't wake up if a new task is enqueued, etc. Also, I guess such a
queue could be useful in general.
- I had to create a DelayedFuture subclass, which is returned by
schedule_XXX methods. The main differences with raw Future are that it
has a scheduled time and period attributes, and supports
reinitialization (a future can only be run once). It can be cancelled,
and also supports result/exception retrieval.
- I don't know if a process-based counterpart
(ScheduledProcessPoolExecutor) is really useful. I didn't look at it
for now.

----------
Added file: http://bugs.python.org/file30199/scheduled.diff
Added file: http://bugs.python.org/file30200/test.py

_______________________________________
Python tracker <report at bugs.python.org>
<http://bugs.python.org/issue995907>
_______________________________________
-------------- next part --------------
diff -ur cpython.orig/Lib/concurrent/futures/_base.py cpython-b3e1be1493a5/Lib/concurrent/futures/_base.py
--- cpython.orig/Lib/concurrent/futures/_base.py	2013-05-07 08:21:21.000000000 +0000
+++ cpython-b3e1be1493a5/Lib/concurrent/futures/_base.py	2013-05-10 16:35:16.000000000 +0000
@@ -6,7 +6,10 @@
 import collections
 import logging
 import threading
-import time
+try:
+    from time import monotonic as time
+except ImportError:
+    from time import time
 
 FIRST_COMPLETED = 'FIRST_COMPLETED'
 FIRST_EXCEPTION = 'FIRST_EXCEPTION'
@@ -188,7 +191,7 @@
             before the given timeout.
     """
     if timeout is not None:
-        end_time = timeout + time.time()
+        end_time = timeout + time()
 
     with _AcquireFutures(fs):
         finished = set(
@@ -204,7 +207,7 @@
             if timeout is None:
                 wait_timeout = None
             else:
-                wait_timeout = end_time - time.time()
+                wait_timeout = end_time - time()
                 if wait_timeout < 0:
                     raise TimeoutError(
                             '%d (of %d) futures unfinished' % (
@@ -390,11 +393,11 @@
             elif self._state == FINISHED:
                 return self.__get_result()
 
-            self._condition.wait(timeout)
+            gotit = self._condition.wait(timeout)
 
             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                 raise CancelledError()
-            elif self._state == FINISHED:
+            elif gotit:
                 return self.__get_result()
             else:
                 raise TimeoutError()
@@ -423,11 +426,11 @@
             elif self._state == FINISHED:
                 return self._exception
 
-            self._condition.wait(timeout)
+            gotit = self._condition.wait(timeout)
 
             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                 raise CancelledError()
-            elif self._state == FINISHED:
+            elif gotit:
                 return self._exception
             else:
                 raise TimeoutError()
@@ -499,6 +502,39 @@
             self._condition.notify_all()
         self._invoke_callbacks()
 
+
+class DelayedFuture(Future):
+    """A future whose execution can be delayed, and periodic."""
+
+    def __init__(self, sched_time, period=0, delay=0):
+        super().__init__()
+        self._sched_time = sched_time
+        if period > 0:
+            # step > 0 => fixed rate
+            self._step = period
+        elif delay > 0:
+            # step < 0 => fixed delay
+            self._step = -delay
+        else:
+            # step == 0 => one-shot
+            self._step = 0
+
+    def is_periodic(self):
+        return self._step != 0
+
+    def get_sched_time(self):
+        return self._sched_time
+
+    def rearm(self):
+        """Re-arm the future, and update its scheduled execution time."""
+        with self._condition:
+            if self._step > 0:
+                self._sched_time += self._step
+            else:
+                self._sched_time = time() - self._step
+            self._state = PENDING
+
+
 class Executor(object):
     """This is an abstract base class for concrete asynchronous executors."""
 
@@ -532,7 +568,7 @@
             Exception: If fn(*args) raises for any values.
         """
         if timeout is not None:
-            end_time = timeout + time.time()
+            end_time = timeout + time()
 
         fs = [self.submit(fn, *args) for args in zip(*iterables)]
 
@@ -544,7 +580,7 @@
                     if timeout is None:
                         yield future.result()
                     else:
-                        yield future.result(end_time - time.time())
+                        yield future.result(end_time - time())
             finally:
                 for future in fs:
                     future.cancel()
@@ -569,3 +605,15 @@
     def __exit__(self, exc_type, exc_val, exc_tb):
         self.shutdown(wait=True)
         return False
+
+
+class ScheduledExecutor(Executor):
+
+    def schedule(self, delay, fn, *args, **kwargs):
+        raise NotImplementedError()
+
+    def schedule_fixed_rate(self, init_delay, period, fn, *args, **kwargs):
+        raise NotImplementedError()
+
+    def schedule_fixed_delay(self, init_delay, delay, fn, *args, **kwargs):
+        raise NotImplementedError()
diff -ur cpython.orig/Lib/concurrent/futures/__init__.py cpython-b3e1be1493a5/Lib/concurrent/futures/__init__.py
--- cpython.orig/Lib/concurrent/futures/__init__.py	2013-05-07 08:21:21.000000000 +0000
+++ cpython-b3e1be1493a5/Lib/concurrent/futures/__init__.py	2013-05-10 15:29:04.000000000 +0000
@@ -15,4 +15,4 @@
                                       wait,
                                       as_completed)
 from concurrent.futures.process import ProcessPoolExecutor
-from concurrent.futures.thread import ThreadPoolExecutor
+from concurrent.futures.thread import ScheduledThreadPoolExecutor, ThreadPoolExecutor
diff -ur cpython.orig/Lib/concurrent/futures/thread.py cpython-b3e1be1493a5/Lib/concurrent/futures/thread.py
--- cpython.orig/Lib/concurrent/futures/thread.py	2013-05-07 08:21:21.000000000 +0000
+++ cpython-b3e1be1493a5/Lib/concurrent/futures/thread.py	2013-05-10 16:37:16.000000000 +0000
@@ -9,6 +9,10 @@
 from concurrent.futures import _base
 import queue
 import threading
+try:
+    from time import monotonic as time
+except ImportError:
+    from time import time
 import weakref
 
 # Workers are created as daemon threads. This is done to allow the interpreter
@@ -57,6 +61,39 @@
         else:
             self.future.set_result(result)
 
+class _DelayedWorkItem(_WorkItem):
+
+    def __init__(self, queue, future, fn, args, kwargs):
+        super().__init__(future, fn, args, kwargs)
+        self.queue = queue
+
+    def run(self):
+        if not self.future.set_running_or_notify_cancel():
+            return
+
+        try:
+            result = self.fn(*self.args, **self.kwargs)
+        except BaseException as e:
+            self.future.set_exception(e)
+        else:
+            self.future.set_result(result)
+            if self.future.is_periodic():
+                # rearm the future - it also updates it scheduled time
+                self.future.rearm()
+                # and re-schedule ourselves - XXX don't reschedule if the pool
+                # is shut down
+                self.queue.put(self)
+                
+
+class _DelayedWorkItemQueue(queue.SchedQueue):
+
+    def put(self, w, block=True, timeout=None):
+        if w is not None:
+            super().put_abs(w.future.get_sched_time(), w, block, timeout)
+        else:
+            super().put_abs(0, None, block, timeout)
+
+
 def _worker(executor_reference, work_queue):
     try:
         while True:
@@ -130,3 +167,31 @@
             for t in self._threads:
                 t.join()
     shutdown.__doc__ = _base.Executor.shutdown.__doc__
+
+class ScheduledThreadPoolExecutor(ThreadPoolExecutor):
+    def __init__(self, max_workers):
+        super().__init__(max_workers)
+        self._work_queue = _DelayedWorkItemQueue()
+
+    def schedule(self, init_delay, fn, *args, **kwargs):
+        f = _base.DelayedFuture(time() + init_delay)
+        return self._schedule(f, fn, *args, **kwargs)
+
+    def schedule_fixed_rate(self, init_delay, period, fn, *args, **kwargs):
+        f = _base.DelayedFuture(time() + init_delay, period=period)
+        return self._schedule(f, fn, *args, **kwargs)
+
+    def schedule_fixed_delay(self, init_delay, delay, fn, *args, **kwargs):
+        f = _base.DelayedFuture(time() + init_delay, delay=delay)
+        return self._schedule(f, fn, *args, **kwargs)
+
+    def _schedule(self, f, fn, *args, **kwargs):
+        with self._shutdown_lock:
+            if self._shutdown:
+                raise RuntimeError('cannot schedule new futures after shutdown')
+
+            w = _DelayedWorkItem(self._work_queue, f, fn, args, kwargs)
+
+            self._work_queue.put(w)
+            self._adjust_thread_count()
+            return f
--- cpython.orig/Lib/queue.py	2013-05-07 08:21:21.000000000 +0000
+++ cpython-b3e1be1493a5/Lib/queue.py	2013-05-10 16:32:50.000000000 +0000
@@ -247,3 +247,71 @@
 
     def _get(self):
         return self.queue.pop()
+
+
+class SchedQueue(PriorityQueue):
+    '''Variant of Queue that retrieves open entries as their deadline expire.
+    '''
+
+    def put(self, delay, item, block=True, timeout=None):
+        self.put_abs(time() + delay, item, block, timeout)
+
+    def put_abs(self, deadline, item, block=True, timeout=None):
+        super().put((deadline, item), block, timeout)
+
+    def _put(self, data):
+        deadline, item = data
+
+        do_notify = False
+        if self.queue:
+            earliest_deadline, _ = self.queue[0]
+            if deadline < earliest_deadline:
+                do_notify = True
+        else:
+            do_notify = True
+
+        heappush(self.queue, (deadline, item))
+
+        if do_notify:
+            self.not_empty.notify_all()
+
+    def get(self, block=True, timeout=None):
+        with self.not_empty:
+            if not self.queue and not block:
+                raise Empty
+            if timeout is not None:
+                if timeout < 0:
+                    raise ValueError("'timeout' must be a positive number")
+                else:
+                    timeout_deadline = time() + timeout
+
+            while True:
+                if not self.queue:
+                    if timeout is None:
+                        self.not_empty.wait()
+                    else:
+                        delay = timeout_deadline - time()
+                        if delay > 0:
+                            self.not_empty.wait(delay)
+                        else:
+                            raise Empty
+                else:
+                    deadline, item = self.queue[0]
+                    now = time()
+
+                    if now >= deadline:
+                        heappop(self.queue)
+                        if self.queue:
+                            self.not_empty.notify_all()
+                        return item
+                    elif not block:
+                        raise Empty
+                    elif timeout is None:
+                        self.not_empty.wait(deadline - now)
+                    else:
+                        deadline = min(deadline, timeout_deadline)
+                        delay = deadline - now
+                        if delay > 0:
+                            self.not_empty.wait(delay)
+                        else:
+                            raise Empty
-------------- next part --------------
from concurrent.futures import ScheduledThreadPoolExecutor
import time


def say(text):
    print("{}: {}".format(time.ctime(), text))


with ScheduledThreadPoolExecutor(5) as p:
    p.schedule(1, say, 'hello 1')
    f = p.schedule_fixed_rate(0, 2, say, 'hello 2')
    p.schedule_fixed_delay(0, 3, say, 'hello 3')
    time.sleep(6)
    say("cancelling: %s" % f)
    f.cancel()
    time.sleep(10)
    say("shutting down")


More information about the Python-bugs-list mailing list