Doubts about how implementing asynchronous timeouts through a heap

Josiah Carlson josiah.carlson at gmail.com
Tue Aug 5 12:45:48 EDT 2008


On Jul 12, 12:16 pm, Josiah Carlson <josiah.carl... at gmail.com> wrote:
> On Jul 9, 4:13 am, "Giampaolo Rodola'" <gne... at gmail.com> wrote:
>
>
>
> > Hi,
> > I'm trying to implement an asynchronous scheduler forasyncoreto call
> > functions at a later time without blocking the main loop.
> > The logic behind it consists in:
>
> > - adding the scheduled functions into a heapified list
> > - calling a "scheduler" function at every loop which checks the
> > scheduled functions due to expire soonest
>
> > Note that, by using a heap, the first element of the list is always
> > supposed to be the one with the lower timeout.
> > Here's the code I wrote:
>
> > <--- snippet --->
> > import heapq
> > import time
> > import sys
>
> > delayed_map = []
>
> > class delayed_call:
> >     """Calls a function at a later time.
>
> >     The instance returned is an object that can be used to cancel the
> >     scheduled call, by calling its cancel() method.
> >     It also may be rescheduled by calling delay() or reset()} methods.
> >     """
>
> >     def __init__(self, delay, target, *args, **kwargs):
> >         """
> >         - delay: the number of seconds to wait
> >         - target: the callable object to call later
> >         - args: the arguments to call it with
> >         - kwargs: the keyword arguments to call it with
> >         """
> >         assert callable(target), "%s is not callable" %target
> >         assert sys.maxint >= delay >= 0, "%s is not greater than or
> > equal " \
> >                                            "to 0 seconds" % (delay)
> >         self.__delay = delay
> >         self.__target = target
> >         self.__args = args
> >         self.__kwargs = kwargs
> >         # seconds from the epoch at which to call the function
> >         self.timeout = time.time() + self.__delay
> >         self.cancelled = False
> >         heapq.heappush(delayed_map, self)
>
> >     def __le__(self, other):
> >         return self.timeout <= other.timeout
>
> >     def active(self):
> >         """Return True if this scheduler has not been cancelled."""
> >         return not self.cancelled
>
> >     def call(self):
> >         """Call this scheduled function."""
> >         self.__target(*self.__args, **self.__kwargs)
>
> >     def reset(self):
> >         """Reschedule this call resetting the current countdown."""
> >         assert not self.cancelled, "Already cancelled"
> >         self.timeout = time.time() + self.__delay
> >         if delayed_map[0] is self:
> >             heapq.heapify(delayed_map)
>
> >     def delay(self, seconds):
> >         """Reschedule this call for a later time."""
> >         assert not self.cancelled, "Already cancelled."
> >         assert sys.maxint >= seconds >= 0, "%s is not greater than or
> > equal " \
> >                                            "to 0 seconds" %(seconds)
> >         self.__delay = seconds
> >         self.reset()
>
> >     def cancel(self):
> >         """Unschedule this call."""
> >         assert not self.cancelled, "Already cancelled"
> >         del self.__target, self.__args, self.__kwargs
> >         if self in delayed_map:
> >             if delayed_map[0] is self:
> >                 delayed_map.remove(self)
> >                 heapq.heapify(delayed_map)
> >             else:
> >                 delayed_map.remove(self)
> >         self.cancelled = True
>
> > def fun(arg):
> >     print arg
>
> > a = delayed_call(0.6, fun, '0.6')
> > b = delayed_call(0.5, fun, '0.5')
> > c = delayed_call(0.4, fun, '0.4')
> > d = delayed_call(0.3, fun, '0.3')
> > e = delayed_call(0.2, fun, '0.2')
> > f = delayed_call(0.1, fun, '0.1')
>
> > while delayed_map:
> >     now = time.time()
> >     while delayed_map and now >= delayed_map[0].timeout:
> >         delayed = heapq.heappop(delayed_map)
> >         try:
> >             delayed.call()
> >         finally:
> >             if not delayed.cancelled:
> >                 delayed.cancel()
> >     time.sleep(0.01)
> > </--- snippet --->
>
> > Here comes the questions.
> > Since that the timeouts of the scheduled functions contained in the
> > list can change when I reset() or cancel() them I don't know exactly
> > *when* the list needs to be heapified().
> > By doing some tests I came to the conclusion that I need the heapify()
> > the list only when the function I reset() or cancel() is the *first of
> > the list* but I'm not absolutely sure about it.
> > When do you think it would be necessary calling heapify()?
> > I wrote a short test suite which tests the code above and I didn't
> > notice strange behaviors but since that I don't know much about the
> > logic behind heaps I'd need some help.
> > Thanks a lot in advance.
>
> > --- Giampaolohttp://code.google.com/p/pyftpdlib/
>
> I dug through my old pair heap implementation, did a little hacking on
> heapq, and wrote a task scheduler system that plugs in toasyncore.
>
> To schedule a task, you use:
> task =asyncore.schedule_task(schedule, delay, callable, *args,
> **kwargs)
>
> Once you have that task object, you can then use:asyncore.reschedule_task(schedule, delay, task)asyncore.abs_reschedule_task(schedule, time, task)
> ... to reschedule the task into the future (or past).
>
> You can also:asyncore.delete_task(schedule, task)
> ... to completely remove the task from the scheduler.
>
> Each one of these operations are O(logn), where n is the number of
> tasks currently known to the scheduler.
>
> To accommodate the new scheduler,asyncore.loop() now has the
> following call signature.
> def loop(timeout=30.0, use_poll=False, map=None, count=None,
> schedule=None, use_schedule=False):
>
> To try to help prevent poll_fcn() starvation (in the case of long-
> running scheduled tasks), the task window execution is set just prior
> to the poll_fun() call to be now + .01 seconds.  That is, window is
> set, poll_fun() is called (which should handle current I/O
> operations), and all tasks that are to be completed prior to the end
> of the task window (now+.01 seconds, set prior to the poll_fun()
> call).
>
> Asyncoreobjects will not gain a set_scheduler() method, nor will they
> gain a schedule keyword argument on instantiation.  Why?  Because the
> scheduler is not required for socket I/O to work properly.  If you
> want to use the scheduler from your own subclasses,asyncore.<schedule_fcn>(asyncore.scheduled_tasks, ...) should be
> sufficient.
>
> This scheduler can be easily plugged into other systems, and it's
> likely that I'll add it as an interactive scheduler to sched.py, put
> the pair heap implementation into collections, and call it good.
>
>  - Josiah

After some personal thought a couple weeks back, I instead added bits
of functionality to sched.py to make all of these features possible,
then using it from asyncore.  I discussed why a couple weeks ago on my
blog: http://chouyu-31.livejournal.com/316112.html

 - Josiah



More information about the Python-list mailing list