Catching exceptions with multi-processing

Oscar Benjamin oscar.j.benjamin at gmail.com
Fri Jun 19 11:01:17 EDT 2015


On 19 June 2015 at 15:01, Fabien <fabien.maussion at gmail.com> wrote:
> Folks,
>
> I am developing a tool which works on individual entities (glaciers) and do
> a lot of operations on them. There are many tasks to do, one after each
> other, and each task follows the same interface:
>
> def task_1(path_to_glacier_dir):
>     open file1 in path_to_glacier_dir
>     do stuff
>     if dont_work:
>         raise RuntimeError("didnt work")
>     write file2 in path_to_glacier_dir
>
> This way, the tasks can be run in parallel very easily:
>
> import multiprocessing as mp
> pool = mp.Pool(4)
>
> dirs = [list_of_dirs]
> pool.map(task1, dirs, chunksize=1)
> pool.map(task2, dirs, chunksize=1)
> pool.map(task3, dirs, chunksize=1)
>
> ... and so forth. I tested the tool for about a hundred glaciers but now it
> has to run for thousands of them. There are going to be errors, some of them
> are even expected for special outliers. What I would like the tool to do is
> that in case of error, it writes the identifier of the problematic glacier
> somewhere, the error encountered and more info if possible. Because of
> multiprocessing, I can't write in a shared file, so I thought that the
> individual processes should write a unique "error file" in a dedicated
> directory.
>
> What I don't know how to, however, is how to do this at minimal cost and in
> a generic way for all tasks. Also, the task2 should not be run if task1
> threw an error. Sometimes (for debugging), I'd rather keep the normal
> behavior of raising an error and stopping the program.
>
> Do I have to wrap all tasks with a "try: exept:" block? How to switch
> between behaviors? All the solutions I could think about look quite ugly to
> me. And it seems that this is a general problem that someone cleverer than
> me had solved before ;-)

A simple way to approach this could be something like:

#!/usr/bin/env python3

import math
import multiprocessing

def sqrt(x):
    if x < 0:
        return 'error', x
    else:
        return 'success', math.sqrt(x)

if __name__ == "__main__":
    numbers = [1, 2, 3, -1, -3]
    pool = multiprocessing.Pool()
    for ret, val in pool.imap(sqrt, numbers):
        if ret == 'error':
            raise ValueError(val)
        print(val)

Just replace the raise statement with whatever you want to do (write
to a file etc). Since all errors are handled in the master process
there are no issues with writing to a file.

--
Oscar



More information about the Python-list mailing list