ehacinom
ehacinom

Reputation: 8894

Is there a way for workers in multiprocessing.Pool's apply_async to catch errors and continue?

When using multiprocessing.Pool's apply_async(), what happens to breaks in code? This includes, I think, just exceptions, but there may be other things that make the worker functions fail.

import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())
for f in files:
    pool.apply_async(workerfunct, args=(*args), callback=callbackfunct) 

As I understand it right now, the process/worker fails (all other processes continue) and anything past a thrown error is not executed, EVEN if I catch the error with try/except.

As an example, usually I'd except Errors and put in a default value and/or print out an error message, and the code continues. If my callback function involves writing to file, that's done with default values.

This answerer wrote a little about it:

I suspect the reason you're not seeing anything happen with your example code is because all of your worker function calls are failing. If a worker function fails, callback will never be executed. The failure won't be reported at all unless you try to fetch the result from the AsyncResult object returned by the call to apply_async. However, since you're not saving any of those objects, you'll never know the failures occurred. If I were you, I'd try using pool.apply while you're testing so that you see errors as soon as they occur.

Upvotes: 4

Views: 4884

Answers (1)

dano
dano

Reputation: 94881

If you're using Python 3.2+, you can use the error_callback keyword argument to to handle exceptions raised in workers.

pool.apply_async(workerfunct, args=(*args), callback=callbackfunct, error_callback=handle_error) 

handle_error will be called with the exception object as an argument.

If you're not, you have to wrap all your worker functions in a try/except to ensure your callback is executed. (I think you got the impression that this wouldn't work from my answer in that other question, but that's not the case. Sorry!):

def workerfunct(*args):
    try:
        # Stuff
    except Exception as e:
        # Do something here, maybe return e?

pool.apply_async(workerfunct, args=(*args), callback=callbackfunct) 

You could also use a wrapper function if you can't/don't want to change the function you actually want to call:

def wrapper(func, *args):
    try:
        return func(*args)
    except Exception as e:
        return e

pool.apply_async(wrapper, args=(workerfunct, *args), callback=callbackfunct) 

Upvotes: 8

Related Questions