Medhat
Medhat

Reputation: 1652

Right way to exit from function called by multiprocessing.Pool?

How to exit from a function called my multiprocessing.Pool

Here is an example of the code I am using, when I put a condition to exit from function worker when I use this as a script in terminal it halts and does not exit.

def worker(n):
    if n == 4:
        exit("wrong number")  # tried to use sys.exit(1) did not work
    return n*2

def caller(mylist, n=1):
    n_cores = n if n > 1 else multiprocessing.cpu_count()
    print(n_cores)
    pool = multiprocessing.Pool(processes=n_cores)
    result = pool.map(worker, mylist)
    pool.close()
    pool.join()
    return result

l = [2, 3, 60, 4]
myresult = caller(l, 4)

Upvotes: 3

Views: 5720

Answers (2)

martineau
martineau

Reputation: 123453

As I said, I don't think you can exit the process running the main script from a worker process.

You haven't explained exactly why you want to do this, so this answer is a guess, but perhaps raising a custom Exception and handling it in an explict except as shown below would be an acceptable way to workaround the limitation.

import multiprocessing
import sys

class WorkerStopException(Exception):
    pass

def worker(n):
    if n == 4:
        raise WorkerStopException()
    return n*2

def caller(mylist, n=1):
    n_cores = n if n > 1 else multiprocessing.cpu_count()
    print(n_cores)
    pool = multiprocessing.Pool(processes=n_cores)
    try:
        result = pool.map(worker, mylist)
    except WorkerStopException:
        sys.exit("wrong number")
    pool.close()
    pool.join()
    return result

if __name__ == '__main__':
    l = [2, 3, 60, 4]
    myresult = caller(l, 4)

Output displayed when run:

4
wrong number

(The 4 is the number of CPUs my system has.)

Upvotes: 3

Darkonaut
Darkonaut

Reputation: 21654

The thing with pool.map is, that it will raise exceptions from child-processes only after all tasks are finished. But your comments sound like you need immediate abortion of all processing as soon as a wrong value is detected in any process. This would be a job for pool.apply_async then.

pool.apply_async offers error_callbacks, which you can use to let the pool terminate. Workers will be fed item-wise instead of chunk-wise like with the pool.map variants, so you get the chance for early exit on each processed argument.

I'm basically reusing my answer from here:

from time import sleep
from multiprocessing import Pool

def f(x):
    sleep(x)
    print(f"f({x})")
    if x == 4:
        raise ValueError(f'wrong number: {x}')
    return x * 2

def on_error(e):
    if type(e) is ValueError:
        global terminated
        terminated = True
        pool.terminate()
        print(f"oops: {type(e).__name__}('{e}')")


def main():
    global pool
    global terminated

    terminated = False

    pool = Pool(4)
    results = [pool.apply_async(f, (x,), error_callback=on_error)
               for x in range(10)]
    pool.close()
    pool.join()

    if not terminated:
        for r in results:
            print(r.get())


if __name__ == '__main__':
    main()

Output:

f(0)
f(1)
f(2)
f(3)
f(4)
oops: ValueError('wrong number: 4')

Process finished with exit code 0

Upvotes: 1

Related Questions