Reputation: 57
I have a script that is essentially an API scraper, it runs perpetually. I strapped a map_async pool to it and its glorious, the pool was hiding some errors which I learned was pretty common. So I incorporated this wrapped helper function.
helper.py
def trace_unhandled_exceptions(func):
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
try:
return func(*args, **kwargs)
except:
print('Exception in '+func.__name__)
traceback.print_exc()
return wrapped_func
My main script looks like
scraper.py
import multiprocessing as mp
from helper import trace_unhandled_exceptions
start_block = 100
end_block = 50000
@trace_unhandled_exceptions
def main(block_num):
block = blah_blah(block_num)
return block
if __name__ == "__main__":
cpus = min(8, mp.cpu_count()-1 or 1)
pool = mp.Pool(cpus)
pool.map_async(main, range(start_block - 20, end_block), chunksize=cpus)
pool.close()
pool.join()
This works great, im receiving exception:
Exception in main
Traceback (most recent call last):
.....
How can I get the script to end on exception, ive tried incorporating os.exit or sys.exit into the helper function like this
def trace_unhandled_exceptions(func):
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
try:
return func(*args, **kwargs)
except:
print('Exception in '+func.__name__)
traceback.print_exc()
os._exit(1)
return wrapped_func
But I believe its only terminating the child process and not the entire script, any advice?
Upvotes: 1
Views: 3175
Reputation: 123443
I don't think you need that trace_unhandled_exception
decorator to do what you want, at least not if you use pool.apply_async()
instead of pool.map_async()
because the you can use the error_callback=
option it supports to be notified whenever the target function fails. Note that map_async()
also supports something similar, but it's not called until the entire iterable has been consumed — so it would not be suitable for what you're wanting to do.
I got the idea for this approach from @Tim Peters' answer to a similar question titled Multiprocessing Pool - how to cancel all running processes if one returns the desired result?
import multiprocessing as mp
import random
import time
START_BLOCK = 100
END_BLOCK = 1000
def blah_blah(block_num):
if block_num % 10 == 0:
print(f'Processing block {block_num}')
time.sleep(random.uniform(.01, .1))
return block_num
def main(block_num):
if random.randint(0, 100) == 42:
print(f'Raising radom exception')
raise RuntimeError('RANDOM TEST EXCEPTION')
block = blah_blah(block_num)
return block
def error_handler(exception):
print(f'{exception} occurred, terminating pool.')
pool.terminate()
if __name__ == "__main__":
processes = min(8, mp.cpu_count()-1 or 1)
pool = mp.Pool(processes)
for i in range(START_BLOCK-20, END_BLOCK):
pool.apply_async(main, (i,), error_callback=error_handler)
pool.close()
pool.join()
print('-fini-')
Upvotes: 2
Reputation: 44108
I am not sure what you mean by the pool hiding errors. My experience is that when a worker function (i.e. the target of a Pool
method) raises an uncaught exception, it doesn't go unnoticed. Anyway,...
I would suggest that:
trace_unhandled_exception
decorator and allow your worker function, main
, to raise an exception andmap_async
(why that instead of map
?), you use method imap
, which allows you to iterate individual return values and any exception that may have been thrown by main
as they become available and therefore as soon as you detect an exception you can then call multiprocessing.Pool.terminate()
to (1) cancel any tasks that have been submitted but not started or (2) tasks running and not yet completed. As an aside, even if you don't call terminate
, once an uncaught exception occurs in a submitted task, the processing pool flushes the input task queue.Once the main process detects the exception, it can. of course, call sys.exit()
after cleaning up the pool.
import multiprocessing as mp
start_block = 100
end_block = 50000
def main(block_num):
if block_num == 1000:
raise ValueError("I don't like 1000.")
return block_num * block_num
if __name__ == "__main__":
cpus = min(8, mp.cpu_count()-1 or 1)
pool = mp.Pool(cpus)
it = pool.imap(main, range(start_block - 20, end_block), chunksize=cpus)
results = []
while True:
try:
result = next(it)
except StopIteration:
break
except Exception as e:
print(e)
# Kill remaining tasks
pool.terminate()
break
else:
results.append(result)
pool.close()
pool.join()
Prints:
I don't like 1000.
Alternatively, you can keep your decorator function, but modify it to return the Exception
instance it caught (currently, it implicitly returns None
). Then you can modify the while True
loop as follows:
while True:
try:
result = next(it)
except StopIteration:
break
else:
if isinstance(result, Exception):
pool.terminate()
break
results.append(result)
Since no actual exception has been raised, the call to terminate
becomes absolutely essential if you want to continue execution without allowing the remaining submitted tasks to run. Even if you just want to immediately exit, it is still a good idea terminate and clean up the pool to ensure that nothing hangs when you do call exit.
Upvotes: 1