multiprocessing, queue

Chris Angelico rosuav at gmail.com
Fri May 8 09:34:47 EDT 2015


On Fri, May 8, 2015 at 8:08 PM, Michael Welle <mwe012008 at gmx.net> wrote:
> Hello,
>
> what's wrong with [0]? As num_tasks gets higher proc.join() seems to
> block forever. First I thought the magical frontier is around 32k tasks,
> but then it seemed to work with 40k tasks. Now I'm stuck around 7k
> tasks. I think I do something fundamentally wrong, but I can't find it.
>
> Regards
> hmw
>
> [0] http://pastebin.com/adfBYgY9

Your code's small enough to include inline, so I'm doing that:

#!/usr/bin/python3
# -*- coding: utf-8 -*-

from multiprocessing import Process, Queue

class Foo(Process):

    def __init__(self, task_queue, result_queue):
        Process.__init__(self)
        self.result_queue = result_queue
        self.task_queue = task_queue

    def run(self):
        while True:
            n = self.task_queue.get()
            if n is None:
                break

            self.result_queue.put(1)

        return


def main():
    results = Queue()
    tasks = Queue()
    procs = []
    num_procs = 8
    num_tasks = 8000

    for i in range(num_procs):
        proc = Foo(tasks, results)
        procs.append(proc)

    for proc in procs:
        proc.start()

    for i in range(num_tasks):
        tasks.put(i)

    for i in range(num_procs):
        tasks.put(None)

    for proc in procs:
        print("join")
        proc.join()

    while not results.empty():
        result = results.get()
        print('Result: {}'.format(result))


if __name__ == '__main__':
    main()

# -- end of code --


First thing I'd look at is the default queue size. If your result
queue fills up, all processes will block until something starts
retrieving results. If you really want to have all your results stay
in the queue like that, you may need to specify a huge queue size,
which may cost you a lot of memory; much better would be to have each
job post something on the result queue when it's done, and then you
wait till they're all done:

from multiprocessing import Process, Queue

def foo(task_queue, result_queue):
    while True:
        n = task_queue.get()
        if n is None: break
        result_queue.put(1)
    # Make sure None is not a possible actual result
    # Otherwise, create an object() to use as a flag.
    result_queue.put(None)

def feed_tasks(num_tasks, num_procs, tasks):
    for i in range(num_tasks):
        tasks.put(i)

    for i in range(num_procs):
        tasks.put(None)

def main():
    results = Queue()
    tasks = Queue()
    num_procs = 8
    num_tasks = 8000
    procs = [Process(target=foo, args=(tasks, results)) for i in
range(num_procs)]

    for proc in procs: proc.start()

    Process(target=feed_tasks, args=(num_tasks, num_procs, tasks)).start()

    while num_procs:
        result = results.get()
        if result is None: num_procs -= 1
        else: print('Result: {}'.format(result))

    for proc in procs:
        print("join")
        proc.join()

if __name__ == '__main__':
    main()


I've also made a few other changes (for instance, no need to subclass
Process just to pass args), but the most important parts are a
result_queue.put() just before the process ends, and switching the
order of the result-queue-pump and process-join loops.

That still might block, though, at the point where the tasks are being
put onto the queue; so I've spun that off into its own process. (It
might not be necessary, depending on how your tasks work.) But I
tested this on 200,000 tasks (with the printing of results replaced
with a simple counter), and it worked fine, churning through the work
in about ten seconds.

As a general rule, queues need to have both ends operating
simultaneously, otherwise you're likely to have them blocking. In
theory, your code should all work with ridiculously low queue sizes;
the only cost will be concurrency (since you'd forever be waiting for
the queue, so your tasks will all be taking turns). I tested this by
changing the Queue() calls to Queue(1), and the code took about twice
as long to complete. :)

Hope that helps!

ChrisA



More information about the Python-list mailing list