Reputation: 5565
I am using multiprocessing in Python using the pool.apply_async
, to simultaneously run a function with various different arguments.
The relevant extract of the code is:
import multiprocessing as mp
all_details_to_process_full = [[x,y.z], [x2,y2.z2]]
def loop_over_desired_sub(arg_list):
...
if __name__ == '__main__':
pool = mp.Pool(processes=10)
desired_content = [pool.apply_async(loop_over_desired_sub, args=(arg_list,)) for arg_list in all_details_to_process_full]
results = [p.get() for p in desired_content]
As far as i can tell, the default behavior is for Python to only stop the code when an error is raised by the earliest subprocess that was initialized.
For example, if there are 10 items in the list to be processed with separate subproccesses, and there is error in processing the first item (i.e the first subprocess that was initialized), Python will immediately raise the error, stopping the code. If however, there is an error in the second subprocess, while that subproecss will stop, the rest of the code will carry on, until the first item has finished, at which point the error gets raised and the code stops. [if there is an error raised processing the the third item, then both items one and two would need to finish before the error gets raised].
Is there a way to change this behavior, both for:
Any error raised, i.e. in any of the subprocess, to stop the code immediately
The code not to stop if there is an error raised, until all subprocessed have finished
Upvotes: 1
Views: 217
Reputation: 4467
Each of your process are here independent as you use apply_async
. Thus the default behavior of python is to process them independantly, meaning that one failing does not affect the other'
The issue here is that you process the results of your function loop_over_desired_content
in an ordered way.
The get
method will block until the result of the first operation is retrieved (even if the 2nd process returned/failed). Then, it will process the 2nd value, raising an error if needed.
import multiprocessing as mp
import time
def fail_in(args):
x, l = args
if x == l:
raise RuntimeError(x)
time.sleep(.5)
print("Finish process {}".format(x))
return x
if __name__ == '__main__':
pool = mp.Pool(processes=3)
tasks = [(i, 0) for i in range(9)]
try:
desired_content = [pool.apply_async(fail_in, args=(a,)) for a in tasks]
t1 = time.time()
results = [p.get() for p in desired_content]
except RuntimeError:
print("apply_async 0 failed in {:4.2}s".format(time.time()-t1))
pool.terminate()
pool = mp.Pool(processes=3)
tasks = [(i, 1) for i in range(9)]
try:
desired_content = [pool.apply_async(fail_in, args=(a,)) for a in tasks]
t1 = time.time()
results = [p.get() for p in desired_content]
except RuntimeError:
print("apply_async 1 failed in {:4.2}s".format(time.time()-t1))
pool.terminate()
pool = mp.Pool(processes=3)
tasks = [(i, 4) for i in range(9)]
try:
desired_content = [pool.apply_async(fail_in, args=(a,)) for a in tasks]
t1 = time.time()
results = [p.get() for p in desired_content]
except RuntimeError:
print("apply_async 4 failed in {:4.2}s".format(time.time()-t1))
pool.terminate()
Note that the remaining process are not killed by this error. You can see it by trying to submit new job in the pool without using terminate
. They will start after all the remaining processes from your previous job are done.
To get a faster error notification, you could use the method imap_unordered
, that will raise an error as soon as an error is return. You have to be carefull as you need to use job_id to find back the order.
You could also get the notification using the callback_error
to perform clean-up in that case.
For the 2nd behvior, asking for all the result to be process before raising the error, you can just use:
desired_content = [pool.apply_async(loop_over_desired_sub, args=(arg_list,))
for arg_list in all_details_to_process_full]
results = []
for p in desired_content:
try:
r = p.get()
except Exception as r:
pass
results += [r]
results = [p.get() for p in desired_content]
Upvotes: 2