Multiprocessing.Queue - I want to end.

MRAB google at mrabarnett.plus.com
Thu Apr 30 17:57:48 EDT 2009


Luis Zarrabeitia wrote:
> Hi. I'm building a script that closely follows a producer-consumer model. In 
> this case, the producer is disk-bound and the consumer is cpu-bound, so I'm 
> using the multiprocessing module (python2.5 with the multiprocessing backport 
> from google.code) to speed up the processing (two consumers, one per core, 
> and one producer). The consumers are two multiprocessing.Process instances, 
> the producer is the main script, and the data is sent using a 
> multiprocessing.Queue instance (with bounded capacity).
> 
> The problem: when there is no more data to process, how can I signal the 
> consumers to consume until the queue is empty and then stop consuming? I need 
> them to do some clean-up work after they finish (and then I need the main 
> script to summarize the results)
> 
> Currently, the script looks like this:
> 
> ===
> from multiprocessing import Queue, Process
> 
> def consumer(filename, queue):
>     outfile = open(filename,'w')
>     for data in iter(queue.get, None):
>         process_data(data, outfile) # stores the result in the outfile
>     outfile.close()
>     cleanup_consumer(filename)
> 
> if __name__ == "__main__":
>     queue = Queue(100)
>     p1 = Process(target=consumer, args=("file1.txt", queue))
>     p2 = Process(target=consumer, args=("file1.txt", queue))
>     p1.start(); p2.start()
>     for item in read_from_disk(): # this is the disk-bound operation
>         queue.put(item)
>     queue.put(None); queue.put(None)
>     p1.join() # Wait until both consumers finish their work
>     p2.join()
>     # Tried to put this one before... but then the 'get' raises
>     # an exception, even if there are still items to consume.
>     queue.close() 
>     summarize() # very fast, no need to parallelize this.
> ===
> 
> As you can see, I'm sending one 'None' per consumer, and hoping that no 
> consumer will read more than one None. While this particular implementation 
> ensures that, it is very fragile. Is there any way to signal the consumers? 
> (or better yet, the queue itself, as it is shared by all consumers?) 
> Should "close" work for this? (raise the exception when the queue is 
> exhausted, not when it is closed by the producer).
> 
The producer could send just one None to indicate that it has finished
producing.

Each consumer could get the data from the queue, but if it's None then
put it back in the queue for the other consumer, then clean up and
finish.

When all the consumers have finished, the queue will contain just the
single None.



More information about the Python-list mailing list