Reputation: 469
When initializer throw Error like below, script won't stop.
I would like to abort before starting main process(do not run 'do_something').
from multiprocessing import Pool
import contextlib
def initializer():
raise Exception("init failed")
def do_something(args):
# main process
pass
pool = Pool(1, initializer=initializer)
with contextlib.closing(pool):
try:
pool.map_async(do_something, [1]).get(100)
except:
pool.terminate()
The never stopping stacktrace on console is below
...
Exception: init failed
Process ForkPoolWorker-18:
Traceback (most recent call last):
File "/home/hoge/anaconda3/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/home/hoge/anaconda3/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/home/hoge/anaconda3/lib/python3.6/multiprocessing/pool.py", line 103, in worker
initializer(*initargs)
File "hoge.py", line 5, in initializer
raise Exception("init failed")
Exception: init failed
...
My workaround is suppressing initializer error and return at the beginning of the main process by using global flag like below.
But I would like to learn better one.
def initializer():
try:
raise Exception("init failed")
except:
global failed
failed = True
def do_something(args):
global failed
if failed:
# skip when initializer failed
return
# main process
Upvotes: 5
Views: 2548
Reputation: 18960
I just came across the same woe. My first solution was to catch the exception and raise it in the worker function (see below). But on second thought it really means that initializer
support of multiprocessing.Pool
is broken and sould not be used. So I now prefer to do the initialization stuff directly in the worker.
from multiprocessing import Pool
import contextlib, sys
_already_inited = False
def initializer():
global _already_inited
if _already_inited:
return
_already_inited = True
raise Exception("init failed")
def do_something(args):
initializer()
# main process
pool = Pool(1)
with contextlib.closing(pool):
pool.map_async(do_something, [1]).get(100)
Both the code and the stacktrace are simpler.
Off course all your worker function need to call initializer()
.
My initial solution was to defer the exception to the worker function.
from multiprocessing import Pool
import contextlib, sys
failed = None
def initializer():
try:
raise Exception("init failed")
except:
global failed
failed = sys.exc_info()[1]
def do_something(args):
global failed
if failed is not None:
raise RuntimeError(failed) from failed
# main process
pool = Pool(1, initializer=initializer)
with contextlib.closing(pool):
pool.map_async(do_something, [1]).get(100)
That way the caller still gets access to the exception.
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.5/multiprocessing/pool.py", line 44, in mapstar
return list(map(*args))
File "/tmp/try.py", line 15, in do_something
raise RuntimeError(failed)
RuntimeError: init failed
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/tmp/try.py", line 20, in <module>
pool.map_async(do_something, [1]).get(100)
File "/usr/lib/python3.5/multiprocessing/pool.py", line 608, in get
raise self._value
RuntimeError: init failed
(venv) kmkaplan@dev1:~/src/options$ python3 /tmp/try.py
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/tmp/try.py", line 7, in initializer
raise Exception("init failed")
Exception: init failed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.5/multiprocessing/pool.py", line 44, in mapstar
return list(map(*args))
File "/tmp/try.py", line 15, in do_something
raise RuntimeError(failed) from failed
RuntimeError: init failed
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/tmp/try.py", line 20, in <module>
pool.map_async(do_something, [1]).get(100)
File "/usr/lib/python3.5/multiprocessing/pool.py", line 608, in get
raise self._value
RuntimeError: init failed
Upvotes: 3
Reputation: 680
After navigating through the implementation of multiprocessing using PyCharm, I'm convinced that there is no better solution, because Pool started a thread to _maintain_pool() by _repopulate_pool() if any worker process exists--either accidentally or failed to initialize.
Check this out: Lib/multiprocessing/pool.py line 244
Upvotes: 5