Cost of Queue.put

James rent.lupin.road at gmail.com
Mon Mar 17 23:13:29 EDT 2008


Hi all,
I'm struggling with the following problem:

I need to disconnect a web service invocation from the addition of
some content to a search engine, so that the web service will always
return in reasonable time.

Basically, what I need is a multi-process safe persistent quick queue.
The arrangement I had was a simple XML-RPC service running on the web
server which the various web server threads POST the relevant search
engine updates to. These updates were added to a persistent queue
built on top of Queue.Queue and cPickle, and there was a separate
thread in the XML-RPC server actually adding the updates to the search
engine.

However, the CPU consumption of the XML-RPC server seems to grow
pretty much linearly with the length of the persistent queue. Isn't
Queue.put O(1)?

I include the code for the persistent queue below - it's a modified
version of this code: http://mail.python.org/pipermail/python-list/2002-December/177394.html

I suppose the addition of cPickle to the mix must hurt Queue.put()...
Any recommendations for alternative persistent queues?

Thanks,
James


class PickleQueue(Queue.Queue):
    """A multi-producer, multi-consumer, persistent queue."""


    def __init__(self, filename, maxsize=0):
        """Initialize a persistent queue with a filename and maximum
size.


        The filename is used as a persistent data store for the
        queue.
        If maxsize <= 0, the queue size is infinite.
        """
        self.filename = filename
        Queue.Queue.__init__(self, maxsize)

    def _init(self, maxsize):
        # Implements Queue protocol _init for persistent queue.
        # Sets up the pickle files.
        self.maxsize = maxsize
        try:
            self.readfile = file(self.filename, 'r')
            self.queue = cPickle.load(self.readfile)
            self.readfile.close()
        except IOError, err:
            if err.errno == 2:
                # File doesn't exist, continue ...
                self.queue = Queue.deque()
            else:
                # Some other I/O problem, reraise error
                raise err
        except EOFError:
            # File was null? Continue ...
            self.queue = Queue.deque()

        # Rewrite file, so it's created if it doesn't exist,
        # and raises an exception now if we aren't allowed
        self.writefile = file(self.filename, 'w')
        cPickle.dump(self.queue, self.writefile, 1)
        log.info("(PickleQueue._init) created queue")


    def __sync(self):
        # Writes the queue to the pickle file.
        self.writefile.seek(0)
        cPickle.dump(self.queue, self.writefile, 1)
        self.writefile.flush()


    def _put(self, item):
        # Implements Queue protocol _put for persistent queue.
        self.queue.append(item)
        self.__sync()


    def _get(self):
        # Implements Queue protocol _get for persistent queue.
        item = self.queue.popleft()
        self.__sync()
        return item



More information about the Python-list mailing list