Best way to implement a timed queue?

Jan Dries jan.dries at dcube-resource.be
Thu Jan 4 10:51:27 EST 2007


Thomas Ploch wrote:
> I am having troubles with implementing a timed queue. I am using the
> 'Queue' module to manage several queues. But I want a timed access, i.e.
> only 2 fetches per second max. I am horribly stuck on even how I
> actually could write it. Has somebody done that before? And when yes,
> how is the best way to implement it?

I don't know about "the best way" to implement it, but I once solved a 
similar problem with the following scenario.

The idea is that you create a "token" queue, a Queue object of the 
desired depth, 2 in your case. You then have a separate thread that, in 
a loop, puts two "tokens" on the queue and then puts itself to sleep for 
one second.

And the worker thread or threads that must be doing whatever twice per 
second try to get an object from the queue prior to doing their thing.

The following piece of code does the trick. Note that it allows worker 
threads to do something two times every second, and not once every half 
second, though it could easily be modified to accomodate that too.
Also note that the code below keeps running forever as worker threads 
never stop working. You will probably want to change that.

Regards,
Jan


import time
from Queue import Queue,Full,Empty
from thread import start_new_thread

class Manager(object):
     def __init__(self, number_of_workers=10, max_per_sec=5):
         self.MAX_PER_SEC = max_per_sec
         self.NUMBER_OF_WORKERS = number_of_workers
         self.timelimit = Queue(self.MAX_PER_SEC)
         self.donelock = Queue(0)
         self.finished = False

     def do_work(self,number):
         print "Starting worker thread %s" % number
         while True:
             if self.get_time():
                 # do whatever can only be done x times per second
                 print "Worker %s doing work" % number
                 time.sleep(3)  # simulate worker doing some work

         self.signal_done()

     def signal_done(self):
         self.donelock.put(None,True)

     def wait_done(self):
         for i in range(0,self.MAX_PER_SEC):
             self.donelock.get(True)
         self.finished = True

     def feed_time(self):
         while not self.is_finished():
             for i in range(0,self.MAX_PER_SEC):
                 self.insert_time()
             time.sleep(1)

     def insert_time(self):
         try:
             self.timelimit.put_nowait(None)
         except Full:
             pass

     def get_time(self):
         try:
             self.timelimit.get(True,10)
             return True
         except Empty:
             return False

     def is_finished(self):
         return self.finished

     def start_worker_threads(self):
         for i in range(0,self.NUMBER_OF_WORKERS):
             start_new_thread(self.do_work,(i + 1,))

     def start_time_thread(self):
         start_new_thread(self.feed_time,())

     def run(self):
         self.start_time_thread()
         self.start_worker_threads()
         self.wait_done()

def main():
     Manager(10,2).run()


if __name__ == "__main__":
     main()




More information about the Python-list mailing list