Multiprocessing problem with producer/consumer

Piet van Oostrum piet at cs.uu.nl
Wed May 27 15:06:32 EDT 2009


>>>>> Wu Zhe <wu at madk.org> (WZ) wrote:

>WZ> I am writing a server program with one producer and multiple consumers,
>WZ> what confuses me is only the first task producer put into the queue gets
>WZ> consumed, after which tasks enqueued no longer get consumed, they remain
>WZ> in the queue forever.

>WZ> from multiprocessing import Process, Pool, Queue, cpu_count
>WZ> from http import httpserv

>WZ> def work(queue):
>WZ>     while True:
>WZ>         task = queue.get()
>WZ>         if task is None:
>WZ>             break
>WZ>         time.sleep(5)
>WZ>         print "task done:", task
>WZ>     queue.put(None)

>WZ> class Manager:
>WZ>     def __init__(self):
>WZ>         self.queue = Queue()
>WZ>         self.NUMBER_OF_PROCESSES = cpu_count()

>WZ>     def start(self):
>WZ>         self.workers = [Process(target=work, args=(self.queue,))
>WZ>                         for i in xrange(self.NUMBER_OF_PROCESSES)]
>WZ>         for w in self.workers
>WZ>             w.start()

>WZ>         httpserv(self.queue)

>WZ>     def reload(self):
>WZ>         print "RELOAD"

>WZ>     def stop(self):
>WZ>         self.queue.put(None)
>WZ>         for i in range(self.NUMBER_OF_PROCESS):
>WZ>             self.workers[i].join()
>WZ>         queue.close()

>WZ> Manager().start()

>WZ> The producer is a HTTP server which put a task in the queue once receive
>WZ> a request from the user. It seems that consumer processes are still
>WZ> blocked when there are new tasks in the queue, which is weird.

How do you know there are still tasks in the queue?

When I replace your httpserv(self.queue) with:

        for i in range(100):
            self.queue.put(i)
it just works. So it seems probable to me that the problem is in
httpserv. Maybe it stalls or maybe it puts a None in the queue? You
could debug by logging the puts in the queue.

>WZ> P.S. Another two questions not relating to the above, I am not sure if
>WZ> it's better to put HTTP server in its own process other than the main
>WZ> process, if yes how can I make the main process keep running before all
>WZ> children processes end. Second question, what's the best way to stop the
>WZ> HTTP server gracefully?

In don't think it is useful to put the HTTP server in its own process as
the Manager process has hardly anything to do. But if you do you can
make it wait by doing the join of the worker processes at the end,
instead of inside the stop().

Stopping the HTTP server: is it multithreaded? You can have a boolean
that indicates it should accept no new requests. Without more info about
the server it is hard to give a more detailed answer.
-- 
Piet van Oostrum <piet at cs.uu.nl>
URL: http://pietvanoostrum.com [PGP 8DAE142BE17999C4]
Private email: piet at vanoostrum.org



More information about the Python-list mailing list