tabata
tabata

Reputation: 469

How to handle initializer error in multiprocessing.Pool?

When initializer throw Error like below, script won't stop.
I would like to abort before starting main process(do not run 'do_something').

from multiprocessing import Pool
import contextlib

def initializer():
    raise Exception("init failed")

def do_something(args):
    # main process
    pass

pool = Pool(1, initializer=initializer)
with contextlib.closing(pool):
    try:
        pool.map_async(do_something, [1]).get(100)
    except:
        pool.terminate()

The never stopping stacktrace on console is below

...
Exception: init failed
Process ForkPoolWorker-18:
Traceback (most recent call last):
  File "/home/hoge/anaconda3/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/home/hoge/anaconda3/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/home/hoge/anaconda3/lib/python3.6/multiprocessing/pool.py", line 103, in worker
    initializer(*initargs)
  File "hoge.py", line 5, in initializer
    raise Exception("init failed")
Exception: init failed
...

My workaround is suppressing initializer error and return at the beginning of the main process by using global flag like below.
But I would like to learn better one.

def initializer():
    try:
        raise Exception("init failed")
    except:
        global failed
        failed = True

def do_something(args):
    global failed
    if failed:
        # skip when initializer failed
        return
    # main process

Upvotes: 5

Views: 2548

Answers (2)

kmkaplan
kmkaplan

Reputation: 18960

I just came across the same woe. My first solution was to catch the exception and raise it in the worker function (see below). But on second thought it really means that initializer support of multiprocessing.Pool is broken and sould not be used. So I now prefer to do the initialization stuff directly in the worker.

from multiprocessing import Pool
import contextlib, sys

_already_inited = False
def initializer():
    global _already_inited
    if _already_inited:
        return
    _already_inited = True
    raise Exception("init failed")

def do_something(args):
    initializer()
    # main process

pool = Pool(1)
with contextlib.closing(pool):
    pool.map_async(do_something, [1]).get(100)

Both the code and the stacktrace are simpler. Off course all your worker function need to call initializer().

My initial solution was to defer the exception to the worker function.

from multiprocessing import Pool
import contextlib, sys

failed = None
def initializer():
    try:
        raise Exception("init failed")
    except:
        global failed
        failed = sys.exc_info()[1]

def do_something(args):
    global failed
    if failed is not None:
        raise RuntimeError(failed) from failed
    # main process

pool = Pool(1, initializer=initializer)
with contextlib.closing(pool):
    pool.map_async(do_something, [1]).get(100)

That way the caller still gets access to the exception.

multiprocessing.pool.RemoteTraceback:  
""" 
Traceback (most recent call last): 
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 119, in worker 
    result = (True, func(*args, **kwds)) 
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 44, in mapstar 
    return list(map(*args)) 
  File "/tmp/try.py", line 15, in do_something 
    raise RuntimeError(failed) 
RuntimeError: init failed 
""" 

The above exception was the direct cause of the following exception: 

Traceback (most recent call last): 
  File "/tmp/try.py", line 20, in <module> 
    pool.map_async(do_something, [1]).get(100) 
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 608, in get 
    raise self._value 
RuntimeError: init failed 
(venv) kmkaplan@dev1:~/src/options$ python3 /tmp/try.py  
multiprocessing.pool.RemoteTraceback:  
""" 
Traceback (most recent call last): 
  File "/tmp/try.py", line 7, in initializer 
    raise Exception("init failed") 
Exception: init failed 

The above exception was the direct cause of the following exception: 

Traceback (most recent call last): 
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 119, in worker 
    result = (True, func(*args, **kwds)) 
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 44, in mapstar 
    return list(map(*args)) 
  File "/tmp/try.py", line 15, in do_something 
    raise RuntimeError(failed) from failed 
RuntimeError: init failed 
""" 

The above exception was the direct cause of the following exception: 

Traceback (most recent call last): 
  File "/tmp/try.py", line 20, in <module> 
    pool.map_async(do_something, [1]).get(100) 
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 608, in get 
    raise self._value 
RuntimeError: init failed 

Upvotes: 3

TingQian LI
TingQian LI

Reputation: 680

After navigating through the implementation of multiprocessing using PyCharm, I'm convinced that there is no better solution, because Pool started a thread to _maintain_pool() by _repopulate_pool() if any worker process exists--either accidentally or failed to initialize.

Check this out: Lib/multiprocessing/pool.py line 244

Upvotes: 5

Related Questions