Multiprocessing.Queue - I want to end.

Aaron Brady castironpi at gmail.com
Fri May 1 17:51:04 EDT 2009


On Apr 30, 3:49 pm, Luis Zarrabeitia <ky... at uh.cu> wrote:
> Hi. I'm building a script that closely follows a producer-consumer model. In
> this case, the producer is disk-bound and the consumer is cpu-bound, so I'm
> using the multiprocessing module (python2.5 with the multiprocessing backport
> from google.code) to speed up the processing (two consumers, one per core,
> and one producer). The consumers are two multiprocessing.Process instances,
> the producer is the main script, and the data is sent using a
> multiprocessing.Queue instance (with bounded capacity).
>
> The problem: when there is no more data to process, how can I signal the
> consumers to consume until the queue is empty and then stop consuming? I need
> them to do some clean-up work after they finish (and then I need the main
> script to summarize the results)
snip
>     for data in iter(queue.get, None):
>         process_data(data, outfile) # stores the result in the outfile
snip
>     queue.put(None); queue.put(None)
snip
> As you can see, I'm sending one 'None' per consumer, and hoping that no
> consumer will read more than one None. While this particular implementation
> ensures that, it is very fragile. Is there any way to signal the consumers?
> (or better yet, the queue itself, as it is shared by all consumers?)
> Should "close" work for this? (raise the exception when the queue is
> exhausted, not when it is closed by the producer).

You may have to write the consumer loop by hand, rather than using
'for'.  In the same-process case, you can do this.

producer:
sentinel= object( )

consumer:
while True:
  item= queue.get( )
  if item is sentinel:
    break
  etc.

Then, each consumer is guaranteed to consume no more than one
sentinel, and thus producing one sentinel per consumer will halt them
all.

However, with multiple processes, the comparison to 'sentinel' will
fail, since each subprocess gets a copy, not the original, of the
sentinel.  A sample program which sent the same object multiple times
produced this output:

<object object at 0x00B8A388>
<object object at 0x00B8A3A0>

Theoretically, you could send a shared object, which would satisfy the
identity test in the subprocess.  That failed with this exception:

  File "c:\programs\python30\lib\multiprocessing\queues.py", line 51,
in __getstate__
    assert_spawning(self)
...
RuntimeError: Queue objects should only be shared between processes th
rough inheritance

As a result, your options are more complicated.  I think the best
option is to send a tuple with the data.  Instead of sending 'item',
send '( True, item )'.  Then when the producer is finished, send
'( False, <any> )'.  The consumer will break when it encounters a
'False' first value.

An alternative is to spawn a watchman thread in each subprocess, which
merely blocks for a shared Event object, then sets a per-process
variable, then adds a dummy object to the queue.  The dummy is
guaranteed to be added after the last of the data.  Each process is
guaranteed to consume no more than one dummy, so they will all wake
up.

If you don't like those, you could just use a time-out, which checks
the contents of a shared variable, like a one-element array, then
checks the queue to be empty.  If the shared variable is True, and the
queue is empty, there is no more data.

I'm curious how these work and what you decide.



More information about the Python-list mailing list