Peter Prettenhofer
Peter Prettenhofer

Reputation: 1971

multiprocessing.Pool hangs if child causes a segmentation fault

I want to apply a function in parallel using multiprocessing.Pool. The problem is that if one function call triggers a segmentation fault the Pool hangs forever. Has anybody an idea how I can make a Pool that detects when something like this happens and raises an error?

The following example shows how to reproduce it (requires scikit-learn > 0.14)

import numpy as np
from sklearn.ensemble import gradient_boosting
import time

from multiprocessing import Pool

class Bad(object):
    tree_ = None


def fit_one(i):
    if i == 3:
        # this will segfault                                                    
        bad = np.array([[Bad()] * 2], dtype=np.object)
        gradient_boosting.predict_stages(bad,
                                         np.random.rand(20, 2).astype(np.float32),
                                         1.0, np.random.rand(20, 2))
    else:
        time.sleep(1)
    return i


pool = Pool(2)
out = pool.imap_unordered(fit_one, range(10))
# we will never see 3
for o in out:
    print o

Upvotes: 17

Views: 4862

Answers (4)

ShadowRanger
ShadowRanger

Reputation: 155363

This is a known bug, issue #22393, in Python. There is no meaningful workaround as long as you're using multiprocessing.pool until it's fixed. A patch is available at that link, but it has not been integrated into the main release as yet, so no stable release of Python fixes the problem.

Upvotes: 3

shoyer
shoyer

Reputation: 9603

As described in the comments, this just works in Python 3 if you use concurrent.Futures.ProcessPoolExecutor instead of multiprocessing.Pool.

If you're stuck on Python 2, the best option I've found is to use the timeout argument on the result objects returned by Pool.apply_async and Pool.map_async. For example:

pool = Pool(2)
out = pool.map_async(fit_one, range(10))
for o in out:
    print o.get(timeout=1000)  # allow 1000 seconds max

This works as long as you have an upper bound for how long a child process should take to complete a task.

Upvotes: 3

Ninga
Ninga

Reputation: 699

I haven't run your example to see if it can handle the error, but try concurrent futures. Simply replace my_function(i) with your fit_one(i). Keep the __name__=='__main__': structure. concurrent futures seems to need this. The code below is tested on my machine so will hopefully work straight up on yours.

import concurrent.futures

def my_function(i):
    print('function running')
    return i

def run():
    number_processes=4
    executor = concurrent.futures.ProcessPoolExecutor(number_processes)
    futures = [executor.submit(my_function,i) for i in range(10)]
    concurrent.futures.wait(futures)

    for f in futures:
        print(f.result())

if __name__ == '__main__':
    run()

Upvotes: 0

ArekBulski
ArekBulski

Reputation: 5078

Instead of using Pool().imap() maybe you would rather manually create child processes yourself with Process(). I bet the object returned would allow you to get liveness status of any child. You will know if they hang up.

Upvotes: 1

Related Questions