Persistent Queue implementations?

Karl A. Krueger kkrueger at example.edu
Fri Dec 27 14:29:23 EST 2002


Aahz <aahz at pythoncraft.com> wrote:
> In article <aufnif$f5$1 at baldur.whoi.edu>,
> Karl A. Krueger <kkrueger at example.edu> wrote:
>>A daemon thread enforces the timeout -- every timeout interval, it runs
>>through the queue and clears the "handed out" flags from items that have
>>been open for that amount of time or more.  (If everything was handed
>>out before, this also clears the emptiness sema.)
>>
>>This is obviously no good if the queue must be processed in strict
>>order.  It should be OK for a multithreaded mail filter, which is one of
>>the applications I'm thinking of using this with.  :)
> 
> Instead of keeping the object in the queue, stick it in a holding dict
> that also gets pickled.

Implemented.  TransQueue below is like PickleQueue, but additionally
will requeue items that have been dequeued, if they have not been
released by the time a timeout passes.

There has to be a cleaner way to implement the timer in the method
__requeue_forever, though ...



# TransQueue -- persistent transactional queue using cPickle

import Queue, cPickle, thread, threading, time

class TransQueue(Queue.Queue):
    """A multi-producer, multi-consumer, transactional persistent queue."""

    def __init__(self, filename, maxsize=0, timeout=600):
        """Initialize a transactional persistent queue.

        filename is the name of the persistence file (pickle file).
        
        maxsize is the maximum length of the queue, in items (default infinite)
        
        timeout is the interval in seconds an item is permitted to be dequeued
        and not released, before it is requeued.  After this interval (+/- 50%)
        the item will be returned to the queue.
        """
        self.filename = filename
        
        # Store semaphore
        self.ssema = thread.allocate_lock()

        Queue.Queue.__init__(self, maxsize)

        # Synchronize empty and full semaphores
        if self.queue:
            self.esema.release()
        if self._full():
            self.fsema.acquire()

        # Spawn daemonic cleanup thread
        self.cleanup = threading.Thread(
                target=self.__requeue_forever,
                args=(timeout,)
        )
        self.cleanup.setDaemon(1)
        self.cleanup.start()

    def _init(self, maxsize):
        # Implements Queue protocol _init for persistent queue.
        # Sets up the pickle files and initializes the dequeue store.
        self.maxsize = maxsize
        self.ssema.acquire()
        try:
            self.readfile = file(self.filename, 'r')
            (self.serial, self.queue, self.store) = cPickle.load(self.readfile)
            self.readfile.close()
        except IOError, err:
            if err.errno == 2:
                # File doesn't exist, continue ...
                self.serial = 0L
                self.queue = []
                self.store = {}
            else:
                # Some other I/O problem, reraise error
                raise err
        except EOFError:
            # File was null, continue ...
            self.serial = 0L
            self.queue = []
            self.store = {}
        self.ssema.release()
        
        # Rewrite file, so it's created if it doesn't exist,
        # and raises an exception now if we aren't allowed
        self.writefile = file(self.filename, 'w')
        self.__sync()

    def __sync(self):
        # Writes the queue to the pickle file.
        self.writefile.seek(0)
        cPickle.dump((self.serial, self.queue, self.store), self.writefile, 1)
        self.writefile.flush()

    def _put(self, item):
        # Implements Queue protocol _put for persistent queue.
        self.queue.append((self.serial, item))
        self.serial += 1
        self.__sync()

    def _get(self):
        # Implements Queue protocol _get for persistent queue.
        (num, item) = self.queue[0]
        del self.queue[0]
        self.ssema.acquire()
        self.store[num] = (time.time(), item)
        self.__sync()
        self.ssema.release()
        return (num, item)
    
    def release(self, num):
        """Release an item by number from the backup store."""
        self.ssema.acquire()
        del self.store[num]
        self.ssema.release()

    def __requeue(self, num):
        # Return a numbered item from the backup store to the queue.
        (itime, item) = self.store[num]
        del self.store[num]
        self.put(item)

    def __requeue_by_time(self, cutoff, sema=None):
        # Requeue all items stored before cutoff seconds ago.
        # If necessary, release a semaphore.
        self.ssema.acquire()
        cuttime = time.time() - cutoff
        numbers = self.store.keys()
        for num in numbers:
            (itime, item) = self.store[num]
            if itime < cuttime:
                self.__requeue(num)
        self.ssema.release()
        if sema:
            sema.release()

    def __requeue_forever(self, cutoff):
        timesema = thread.allocate_lock()
        while 1:
            timesema.acquire()
            t = threading.Timer(
                cutoff / 2,
                self.__requeue_by_time,
                ( cutoff, timesema )
            )
            t.start()


-- 
Karl A. Krueger <kkrueger at example.edu>
Woods Hole Oceanographic Institution
Email address is spamtrapped.  s/example/whoi/
"Outlook not so good." -- Magic 8-Ball Software Reviews



More information about the Python-list mailing list