Reputation: 1652
How to exit from a function called my multiprocessing.Pool
Here is an example of the code I am using, when I put a condition to exit from function worker
when I use this as a script in terminal it halts and does not exit.
def worker(n):
if n == 4:
exit("wrong number") # tried to use sys.exit(1) did not work
return n*2
def caller(mylist, n=1):
n_cores = n if n > 1 else multiprocessing.cpu_count()
print(n_cores)
pool = multiprocessing.Pool(processes=n_cores)
result = pool.map(worker, mylist)
pool.close()
pool.join()
return result
l = [2, 3, 60, 4]
myresult = caller(l, 4)
Upvotes: 3
Views: 5720
Reputation: 123453
As I said, I don't think you can exit the process running the main script from a worker process.
You haven't explained exactly why you want to do this, so this answer is a guess, but perhaps raising a custom Exception
and handling it in an explict except
as shown below would be an acceptable way to workaround the limitation.
import multiprocessing
import sys
class WorkerStopException(Exception):
pass
def worker(n):
if n == 4:
raise WorkerStopException()
return n*2
def caller(mylist, n=1):
n_cores = n if n > 1 else multiprocessing.cpu_count()
print(n_cores)
pool = multiprocessing.Pool(processes=n_cores)
try:
result = pool.map(worker, mylist)
except WorkerStopException:
sys.exit("wrong number")
pool.close()
pool.join()
return result
if __name__ == '__main__':
l = [2, 3, 60, 4]
myresult = caller(l, 4)
Output displayed when run:
4
wrong number
(The 4
is the number of CPUs my system has.)
Upvotes: 3
Reputation: 21654
The thing with pool.map
is, that it will raise exceptions from child-processes only after all tasks are finished. But your comments sound like you need immediate abortion of all processing as soon as a wrong value is detected in any process. This would be a job for pool.apply_async
then.
pool.apply_async
offers error_callbacks
, which you can use to let the pool terminate. Workers will be fed item-wise instead of chunk-wise like with the pool.map
variants, so you get the chance for early exit on each processed argument.
I'm basically reusing my answer from here:
from time import sleep
from multiprocessing import Pool
def f(x):
sleep(x)
print(f"f({x})")
if x == 4:
raise ValueError(f'wrong number: {x}')
return x * 2
def on_error(e):
if type(e) is ValueError:
global terminated
terminated = True
pool.terminate()
print(f"oops: {type(e).__name__}('{e}')")
def main():
global pool
global terminated
terminated = False
pool = Pool(4)
results = [pool.apply_async(f, (x,), error_callback=on_error)
for x in range(10)]
pool.close()
pool.join()
if not terminated:
for r in results:
print(r.get())
if __name__ == '__main__':
main()
Output:
f(0)
f(1)
f(2)
f(3)
f(4)
oops: ValueError('wrong number: 4')
Process finished with exit code 0
Upvotes: 1