Ending data exchange through multiprocessing pipe

MRAB google at mrabarnett.plus.com
Wed Apr 22 12:33:49 EDT 2009


Michal Chruszcz wrote:
> Hi,
> 
> I am adding support for parallel processing to an existing program
> which fetches some data and then performs some computation with
> results saved to a database. Everything went just fine until I wanted
> to gather all of the results from the subprocesses.
> 
> First idea, which came to my mind, was using a queue. I've got many
> producers (all of the workers) and one consumer. Seams quite simple,
> but it isn't, at least for me. I presumed that each worker will put()
> its results to the queue, and finally will close() it, while the
> parent process will get() them as long as there is an active
> subprocess. So I did this:
> 
>>>> from multiprocessing import Process, Queue, active_children
>>>>
>>>> def f(q):
> ...     q.put(1)
> ...     q.close()
> ...
>>>> queue = Queue()
>>>> Process(target=f, args=(queue,)).start()
>>>> while len(active_children()) > 0:
> ...     print queue.get()
> ...
> 1
> 
> This (of course?) hangs after first iteration of the loop. Delaying
> second iteration by putting a sleep() call fixes the problem, since
> the result of active_children is being some kind of refreshed, but
> it's not the solution. One could say to iterate the exact number of
> subprocesses I have, but let's presume such information isn't
> available.
> 
> Due to my failure with queues I decided to have a try with pipes, and
> again I found a behavior, which is at least surprising and not
> obvious. When I use a pipe within a process everything works
> perfectly:
> 
>>>> from multiprocessing import Pipe
>>>> parent, child = Pipe()
>>>> child.send(1)
>>>> child.close()
>>>> parent.recv()
> 1
>>>> child.closed
> True
>>>> parent.recv()
> Traceback (most recent call last):
>   File "<stdin>", line 1, in <module>
> EOFError
> 
> The problems appear in subprocess communication using pipes, though.
> 
>>>> from multiprocessing import Process, Pipe
>>>> def f(child):
> ...     child.send(1)
> ...     child.close()
> ...
>>>> parent, child = Pipe()
>>>> Process(target=f, args=(child,)).start()
>>>> parent.recv()
> 1
>>>> child.closed
> False
>>>> parent.recv()
> 
> ... and hangs. No idea of fixing this, not even of a workaround, which
> would solve my problem.
> 
> Most possibly I'm missing something in philosophy of multiprocessing,
> but I couldn't find anything covering such a situation. I'd appreciate
> any kind of hint on this topic, as it became a riddle I just have to
> solve. :-)
> 
You could do this:

     from multiprocessing.queues import Empty

     queue = Queue()
     Process(target=f, args=(queue,)).start()
     while active_children():
         try:
             print queue.get(timeout=1)
         except Empty:
             pass



More information about the Python-list mailing list