multiprocessing problems

Nils Ruettershoff nils at ccsg.de
Wed Jan 20 18:30:21 EST 2010


Hi Doxa,

DoxaLogos wrote:
[...]
> I found out my problems.  One thing I did was followed the test queue
> example in the documentation, but the biggest problem turned out to be
> a pool instantiated globally in my script was causing most of the
> endless process spawn, even with the "if __name__ == "__main__":"
> block.
>   

Problems who solves them self, are the best problems ;)

One tip: currently your algorithm has some overhead. 'Cause you are 
starting 4 time an additional python interpreter, compute the files and, 
closing all new spawned interpreter and starting again 4 interpreter, 
which are processing the files.

For such kind of jobs I prefer to start processes once and feeding them 
with data via a queue. This reduces some overhead and increase runtime 
performance.


This could look like this:
(due some pseudo functions not directly executeable -> untested)

import multiprocessing
import Queue

class Worker(multiprocessing.Process):
    def __init__(self, feeder_q, queue_filled):
        multiprocessing.Process.__init__(self)
        self.feeder_q = feeder_q
        self.queue_filled = queue_filled
   
    def run(self):
        serve = True
        # start infinite loop
        while serve:
            try:
                # scan queue for work, will block process up to 5 
seconds. If until then no item is in queue a Queue.Empty will be raised
                text = self.feeder_q.get(True, timeout=5)
                if text:
                    do_stuff(text)
                    # very important! tell the queue that the fetched 
work has been finished
                    # otherwise the feeder_q.join() would block infinite
                    self.input_queue.task_done()
            except Queue.Empty:
                # as soon as queue is empty and all work has been enqueued
                # process can terminate itself
                if self.queue_filled.is_set() and feeder_q.empty():
                    serve = False
        return


if __name__ == '__main__':
    number_of_processes = 4
    queue_filled = multiprocessing.Event()
    feeder_q = multiprocessing.JoinableQueue()
    process_list =[]
    # get file name which need to be processed
    all_files = get_all_files()
    # start processes
    for i in xrange(0,number_of_processes):
        process = Worker(feeder_q, queue_filled)
        process.start()
        process_list.append(thread)
    # start feeding
    for file in all_files:
        feeder_q.put(file)
    # inform processes that all work has been ordered
    queue_filled.set()
    # wait until queue is empty
    feeder_q.join()
    # wait until all processed have finished their jobs
    for process in process_list:
        process.join()



Cheers,
Nils



More information about the Python-list mailing list