Multiprocessing problem

larudwer larudwer at freenet.de
Wed Mar 3 12:49:36 EST 2010


Hello Matt

I think the problem is here:

        for n in xrange(100000):
            outqueue.put(str(n))        <-- fill the queue with 100000 
elements
            try:
                r = inqueue.get_nowait() <-- queue is still empty because 
processes need some time to start
                results.append(r)
            except Empty:
                pass <-- causing 100000 passes

....

        print "-"
        for task in tasks:
            outqueue.put(None)  <-- put even more data in the queue
...
# in the meantime the processes start to run and are trying to put data
# in to the output queue. However this queue might fill up, and lock
# all processes that try to write data in the already filled up queue

        print "joining"
        for task in tasks:
            task.join()            <-- can never succeed because processes 
are waiting for someone reading the result queue
        print "joined"

This example works:

from Queue import Empty, Full
from multiprocessing import Queue, Process
from base64 import b64encode
import time, random

class Worker(Process):
    def __init__(self, inqueue, outqueue):
        Process.__init__(self)
        self.inqueue = inqueue
        self.outqueue = outqueue

    def run(self):
        inqueue = self.inqueue
        outqueue = self.outqueue
        c = 0
        while True:
            arg = inqueue.get()
            if arg is None: break
            c += 1
            b = b64encode(arg)
            outqueue.put(b)

        # Clean-up code goes here
        outqueue.put(c)

class Supervisor(object):
    def __init__(self):
        pass

    def go(self):
        outqueue = Queue()
        inqueue = Queue()
        tasks = [Worker(outqueue, inqueue) for _ in xrange(4)]
        for task in tasks:
            task.start()

        results = []
        print "*"
        for n in xrange(100000):
            outqueue.put(str(n))

        print "-"
        for task in tasks:
            outqueue.put(None)

        print "emptying queue"
        try:
            while True:
                r = inqueue.get_nowait()
                results.append(r)
        except Empty:
            pass
        print "done"
        print len(results)

        print "joining"
        for task in tasks:
            task.join()
        print "joined"

if __name__ == "__main__":
    s = Supervisor()
    s.go()






More information about the Python-list mailing list