Reputation: 1971
I want to apply a function in parallel using multiprocessing.Pool. The problem is that if one function call triggers a segmentation fault the Pool hangs forever. Has anybody an idea how I can make a Pool that detects when something like this happens and raises an error?
The following example shows how to reproduce it (requires scikit-learn > 0.14)
import numpy as np
from sklearn.ensemble import gradient_boosting
import time
from multiprocessing import Pool
class Bad(object):
tree_ = None
def fit_one(i):
if i == 3:
# this will segfault
bad = np.array([[Bad()] * 2], dtype=np.object)
gradient_boosting.predict_stages(bad,
np.random.rand(20, 2).astype(np.float32),
1.0, np.random.rand(20, 2))
else:
time.sleep(1)
return i
pool = Pool(2)
out = pool.imap_unordered(fit_one, range(10))
# we will never see 3
for o in out:
print o
Upvotes: 17
Views: 4862
Reputation: 155363
This is a known bug, issue #22393, in Python. There is no meaningful workaround as long as you're using multiprocessing.pool
until it's fixed. A patch is available at that link, but it has not been integrated into the main release as yet, so no stable release of Python fixes the problem.
Upvotes: 3
Reputation: 9603
As described in the comments, this just works in Python 3 if you use concurrent.Futures.ProcessPoolExecutor
instead of multiprocessing.Pool
.
If you're stuck on Python 2, the best option I've found is to use the timeout
argument on the result objects returned by Pool.apply_async
and Pool.map_async
. For example:
pool = Pool(2)
out = pool.map_async(fit_one, range(10))
for o in out:
print o.get(timeout=1000) # allow 1000 seconds max
This works as long as you have an upper bound for how long a child process should take to complete a task.
Upvotes: 3
Reputation: 699
I haven't run your example to see if it can handle the error, but try concurrent futures. Simply replace my_function(i) with your fit_one(i). Keep the __name__=='__main__':
structure. concurrent futures seems to need this. The code below is tested on my machine so will hopefully work straight up on yours.
import concurrent.futures
def my_function(i):
print('function running')
return i
def run():
number_processes=4
executor = concurrent.futures.ProcessPoolExecutor(number_processes)
futures = [executor.submit(my_function,i) for i in range(10)]
concurrent.futures.wait(futures)
for f in futures:
print(f.result())
if __name__ == '__main__':
run()
Upvotes: 0
Reputation: 5078
Instead of using Pool().imap()
maybe you would rather manually create child processes yourself with Process()
. I bet the object returned would allow you to get liveness status of any child. You will know if they hang up.
Upvotes: 1