Doubts about how implementing asynchronous timeouts through a heap

Josiah Carlson josiah.carlson at gmail.com
Thu Jul 10 20:31:41 EDT 2008


On Jul 10, 11:30 am, Josiah Carlson <josiah.carl... at gmail.com> wrote:
> On Jul 9, 4:13 am, "Giampaolo Rodola'" <gne... at gmail.com> wrote:
>
>
>
> > Hi,
> > I'm trying to implement an asynchronous scheduler for asyncore to call
> > functions at a later time without blocking the main loop.
> > The logic behind it consists in:
>
> > - adding the scheduled functions into a heapified list
> > - calling a "scheduler" function at every loop which checks the
> > scheduled functions due to expire soonest
>
> > Note that, by using a heap, the first element of the list is always
> > supposed to be the one with the lower timeout.
> > Here's the code I wrote:
>
> > <--- snippet --->
> > import heapq
> > import time
> > import sys
>
> > delayed_map = []
>
> > class delayed_call:
> >     """Calls a function at a later time.
>
> >     The instance returned is an object that can be used to cancel the
> >     scheduled call, by calling its cancel() method.
> >     It also may be rescheduled by calling delay() or reset()} methods.
> >     """
>
> >     def __init__(self, delay, target, *args, **kwargs):
> >         """
> >         - delay: the number of seconds to wait
> >         - target: the callable object to call later
> >         - args: the arguments to call it with
> >         - kwargs: the keyword arguments to call it with
> >         """
> >         assert callable(target), "%s is not callable" %target
> >         assert sys.maxint >= delay >= 0, "%s is not greater than or
> > equal " \
> >                                            "to 0 seconds" % (delay)
> >         self.__delay = delay
> >         self.__target = target
> >         self.__args = args
> >         self.__kwargs = kwargs
> >         # seconds from the epoch at which to call the function
> >         self.timeout = time.time() + self.__delay
> >         self.cancelled = False
> >         heapq.heappush(delayed_map, self)
>
> >     def __le__(self, other):
> >         return self.timeout <= other.timeout
>
> >     def active(self):
> >         """Return True if this scheduler has not been cancelled."""
> >         return not self.cancelled
>
> >     def call(self):
> >         """Call this scheduled function."""
> >         self.__target(*self.__args, **self.__kwargs)
>
> >     def reset(self):
> >         """Reschedule this call resetting the current countdown."""
> >         assert not self.cancelled, "Already cancelled"
> >         self.timeout = time.time() + self.__delay
> >         if delayed_map[0] is self:
> >             heapq.heapify(delayed_map)
>
> >     def delay(self, seconds):
> >         """Reschedule this call for a later time."""
> >         assert not self.cancelled, "Already cancelled."
> >         assert sys.maxint >= seconds >= 0, "%s is not greater than or
> > equal " \
> >                                            "to 0 seconds" %(seconds)
> >         self.__delay = seconds
> >         self.reset()
>
> >     def cancel(self):
> >         """Unschedule this call."""
> >         assert not self.cancelled, "Already cancelled"
> >         del self.__target, self.__args, self.__kwargs
> >         if self in delayed_map:
> >             if delayed_map[0] is self:
> >                 delayed_map.remove(self)
> >                 heapq.heapify(delayed_map)
> >             else:
> >                 delayed_map.remove(self)
> >         self.cancelled = True
>
> > def fun(arg):
> >     print arg
>
> > a = delayed_call(0.6, fun, '0.6')
> > b = delayed_call(0.5, fun, '0.5')
> > c = delayed_call(0.4, fun, '0.4')
> > d = delayed_call(0.3, fun, '0.3')
> > e = delayed_call(0.2, fun, '0.2')
> > f = delayed_call(0.1, fun, '0.1')
>
> > while delayed_map:
> >     now = time.time()
> >     while delayed_map and now >= delayed_map[0].timeout:
> >         delayed = heapq.heappop(delayed_map)
> >         try:
> >             delayed.call()
> >         finally:
> >             if not delayed.cancelled:
> >                 delayed.cancel()
> >     time.sleep(0.01)
> > </--- snippet --->
>
> > Here comes the questions.
> > Since that the timeouts of the scheduled functions contained in the
> > list can change when I reset() or cancel() them I don't know exactly
> > *when* the list needs to be heapified().
> > By doing some tests I came to the conclusion that I need the heapify()
> > the list only when the function I reset() or cancel() is the *first of
> > the list* but I'm not absolutely sure about it.
> > When do you think it would be necessary calling heapify()?
> > I wrote a short test suite which tests the code above and I didn't
> > notice strange behaviors but since that I don't know much about the
> > logic behind heaps I'd need some help.
> > Thanks a lot in advance.
>
> > --- Giampaolohttp://code.google.com/p/pyftpdlib/
>
> According to a quick scan, there is some inefficiencies with your
> code.  In particular, you don't need to re-heapify if the item you
> need to remove is the first item; you only need to heappop().
>
> In any case, if the Python standard library heapq module supported non-
> lists as containers, then the pair heap implementation I wrote a
> couple years ago would be perfect for this particular task.  Because
> of the rewriting of heapq in C, without a bit of monkeypatching, we
> can't re-use that implementation (which offered insert/remove of
> arbitrary entries in the heap in O(logn) time, which is significantly
> faster than the O(n) time of your implementation).
>
> What I'm thinking is that we should add a pair heap implementation to
> the heapq module (which can cache the pure Python functions if it
> needs to use them), which would then allow us to use that (and others
> to use it generally), add scheduling, etc.
>
> Regardless, it's a 2.7/3.1 feature, so it's ok if we take it slow.
>
>  - Josiah

Also...first I need to fix the unittest failures. ;)

 - Josiah



More information about the Python-list mailing list