Reputation: 35607
How can I handle KeyboardInterrupt events with python's multiprocessing Pools? Here is a simple example:
from multiprocessing import Pool
from time import sleep
from sys import exit
def slowly_square(i):
sleep(1)
return i*i
def go():
pool = Pool(8)
try:
results = pool.map(slowly_square, range(40))
except KeyboardInterrupt:
# **** THIS PART NEVER EXECUTES. ****
pool.terminate()
print "You cancelled the program!"
sys.exit(1)
print "\nFinally, here are the results: ", results
if __name__ == "__main__":
go()
When running the code above, the KeyboardInterrupt
gets raised when I press ^C
, but the process simply hangs at that point and I have to kill it externally.
I want to be able to press ^C
at any time and cause all of the processes to exit gracefully.
Upvotes: 170
Views: 104332
Reputation: 44313
Many of these answers are old and/or they do not seem to work with later versions of Python (I am running 3.8.5) on Windows if you are executing a method such as Pool.map
, which blocks until all the submitted tasks have completed. The following is my solution.
signal.signal(signal.SIGINT, signal.SIG_IGN)
in the main process to ignore Ctrl-C altogether.ctrl_c_entered
will be set to False
and a a call to signal.signal(signal.SIGINT, signal.SIG_IGN)
will be issued to initially ignore Ctrl-C. The return value from this call will be saved; this is the original, default handler that when re-established allows handing of KyboardInterrupt
exceptions.handle_ctrl_c
, can be used to decorate multiprocessing functions and methods that should exit immediately on Ctrl-C being entered. This decorator will test to see if the global ctrl_c_entered
flag is set and if so ,not even bother to run the function/method and instead will return a KeyboardInterrupt
exception instance. Otherwise a try/catch handler for a KeyboardInterrupt
will be established and the decorated function/method will be invoked. If Ctrl-C is entered, global ctrl_c_entered
will be set to True
and a KeyboardInterrupt
exception instance will be returned. In any event, before returning the decorator will re-establish the SIG_IGN handler.In essence all submitted tasks will be allowed to start but will immediately terminate with a return value of a KeyBoardInterrupt
exception once a Ctrl-C has been entered. The main process can test the return values for the presence of such a return value to detect whether a Ctrl-C was entered.
from multiprocessing import Pool
import signal
from time import sleep
from functools import wraps
def handle_ctrl_c(func):
@wraps(func)
def wrapper(*args, **kwargs):
global ctrl_c_entered
if not ctrl_c_entered:
signal.signal(signal.SIGINT, default_sigint_handler) # the default
try:
return func(*args, **kwargs)
except KeyboardInterrupt:
ctrl_c_entered = True
return KeyboardInterrupt()
finally:
signal.signal(signal.SIGINT, pool_ctrl_c_handler)
else:
return KeyboardInterrupt()
return wrapper
@handle_ctrl_c
def slowly_square(i):
sleep(1)
return i*i
def pool_ctrl_c_handler(*args, **kwargs):
global ctrl_c_entered
ctrl_c_entered = True
def init_pool():
# set global variable for each process in the pool:
global ctrl_c_entered
global default_sigint_handler
ctrl_c_entered = False
default_sigint_handler = signal.signal(signal.SIGINT, pool_ctrl_c_handler)
def main():
signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = Pool(initializer=init_pool)
results = pool.map(slowly_square, range(10))
if any(map(lambda x: isinstance(x, KeyboardInterrupt), results)):
print('Ctrl-C was entered.')
print(results)
pool.close()
pool.join()
if __name__ == '__main__':
main()
Prints:
Ctrl-C was entered.
[0, 1, 4, 9, 16, 25, 36, 49, KeyboardInterrupt(), KeyboardInterrupt()]
Upvotes: 18
Reputation: 49
You can try using the apply_async method of a Pool object, like this:
import multiprocessing
import time
from datetime import datetime
def test_func(x):
time.sleep(2)
return x**2
def apply_multiprocessing(input_list, input_function):
pool_size = 5
pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)
try:
jobs = {}
for value in input_list:
jobs[value] = pool.apply_async(input_function, [value])
results = {}
for value, result in jobs.items():
try:
results[value] = result.get()
except KeyboardInterrupt:
print "Interrupted by user"
pool.terminate()
break
except Exception as e:
results[value] = e
return results
except Exception:
raise
finally:
pool.close()
pool.join()
if __name__ == "__main__":
iterations = range(100)
t0 = datetime.now()
results1 = apply_multiprocessing(iterations, test_func)
t1 = datetime.now()
print results1
print "Multi: {}".format(t1 - t0)
t2 = datetime.now()
results2 = {i: test_func(i) for i in iterations}
t3 = datetime.now()
print results2
print "Non-multi: {}".format(t3 - t2)
Output:
100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000
An advantage of this method is that results processed before interruption will be returned in the results dictionary:
>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
Upvotes: 4
Reputation: 101
I'm a newbie in Python. I was looking everywhere for answer and stumble upon this and a few other blogs and youtube videos. I have tried to copy paste the author's code above and reproduce it on my python 2.7.13 in windows 7 64- bit. It's close to what I wanna achieve.
I made my child processes to ignore the ControlC and make the parent process terminate. Looks like bypassing the child process does avoid this problem for me.
#!/usr/bin/python
from multiprocessing import Pool
from time import sleep
from sys import exit
def slowly_square(i):
try:
print "<slowly_square> Sleeping and later running a square calculation..."
sleep(1)
return i * i
except KeyboardInterrupt:
print "<child processor> Don't care if you say CtrlC"
pass
def go():
pool = Pool(8)
try:
results = pool.map(slowly_square, range(40))
except KeyboardInterrupt:
pool.terminate()
pool.close()
print "You cancelled the program!"
exit(1)
print "Finally, here are the results", results
if __name__ == '__main__':
go()
The part starting at pool.terminate()
never seems to execute.
Upvotes: 5
Reputation: 15060
The voted answer does not tackle the core issue but a similar side effect.
Jesse Noller, the author of the multiprocessing library, explains how to correctly deal with CTRL+C when using multiprocessing.Pool
in a old blog post.
import signal
from multiprocessing import Pool
def initializer():
"""Ignore CTRL+C in the worker process."""
signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = Pool(initializer=initializer)
try:
pool.map(perform_download, dowloads)
except KeyboardInterrupt:
pool.terminate()
pool.join()
Upvotes: 19
Reputation: 101
Usually this simple structure works for Ctrl-C on Pool :
def signal_handle(_signal, frame):
print "Stopping the Jobs."
signal.signal(signal.SIGINT, signal_handle)
As was stated in few similar posts:
Capture keyboardinterrupt in Python without try-except
Upvotes: 10
Reputation: 1034
From what I have recently found, the best solution is to set up the worker processes to ignore SIGINT altogether, and confine all the cleanup code to the parent process. This fixes the problem for both idle and busy worker processes, and requires no error handling code in your child processes.
import signal
...
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
...
def main()
pool = multiprocessing.Pool(size, init_worker)
...
except KeyboardInterrupt:
pool.terminate()
pool.join()
Explanation and full example code can be found at http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/ and http://github.com/jreese/multiprocessing-keyboardinterrupt respectively.
Upvotes: 67
Reputation: 57544
This is a Python bug. When waiting for a condition in threading.Condition.wait(), KeyboardInterrupt is never sent. Repro:
import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"
The KeyboardInterrupt exception won't be delivered until wait() returns, and it never returns, so the interrupt never happens. KeyboardInterrupt should almost certainly interrupt a condition wait.
Note that this doesn't happen if a timeout is specified; cond.wait(1) will receive the interrupt immediately. So, a workaround is to specify a timeout. To do that, replace
results = pool.map(slowly_square, range(40))
with
results = pool.map_async(slowly_square, range(40)).get(9999999)
or similar.
Upvotes: 145
Reputation: 2787
It seems there are two issues that make exceptions while multiprocessing annoying. The first (noted by Glenn) is that you need to use map_async
with a timeout instead of map
in order to get an immediate response (i.e., don't finish processing the entire list). The second (noted by Andrey) is that multiprocessing doesn't catch exceptions that don't inherit from Exception
(e.g., SystemExit
). So here's my solution that deals with both of these:
import sys
import functools
import traceback
import multiprocessing
def _poolFunctionWrapper(function, arg):
"""Run function under the pool
Wrapper around function to catch exceptions that don't inherit from
Exception (which aren't caught by multiprocessing, so that you end
up hitting the timeout).
"""
try:
return function(arg)
except:
cls, exc, tb = sys.exc_info()
if issubclass(cls, Exception):
raise # No worries
# Need to wrap the exception with something multiprocessing will recognise
import traceback
print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))
def _runPool(pool, timeout, function, iterable):
"""Run the pool
Wrapper around pool.map_async, to handle timeout. This is required so as to
trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool
Further wraps the function in _poolFunctionWrapper to catch exceptions
that don't inherit from Exception.
"""
return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)
def myMap(function, iterable, numProcesses=1, timeout=9999):
"""Run the function on the iterable, optionally with multiprocessing"""
if numProcesses > 1:
pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
mapFunc = functools.partial(_runPool, pool, timeout)
else:
pool = None
mapFunc = map
results = mapFunc(function, iterable)
if pool is not None:
pool.close()
pool.join()
return results
Upvotes: 5
Reputation: 4422
I found, for the time being, the best solution is to not use the multiprocessing.pool feature but rather roll your own pool functionality. I provided an example demonstrating the error with apply_async as well as an example showing how to avoid using the pool functionality altogether.
http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/
Upvotes: 4
Reputation: 16858
For some reasons, only exceptions inherited from the base Exception
class are handled normally. As a workaround, you may re-raise your KeyboardInterrupt
as an Exception
instance:
from multiprocessing import Pool
import time
class KeyboardInterruptError(Exception): pass
def f(x):
try:
time.sleep(x)
return x
except KeyboardInterrupt:
raise KeyboardInterruptError()
def main():
p = Pool(processes=4)
try:
print 'starting the pool map'
print p.map(f, range(10))
p.close()
print 'pool map complete'
except KeyboardInterrupt:
print 'got ^C while pool mapping, terminating the pool'
p.terminate()
print 'pool is terminated'
except Exception, e:
print 'got exception: %r, terminating the pool' % (e,)
p.terminate()
print 'pool is terminated'
finally:
print 'joining pool processes'
p.join()
print 'join complete'
print 'the end'
if __name__ == '__main__':
main()
Normally you would get the following output:
staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end
So if you hit ^C
, you will get:
staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end
Upvotes: 34
Reputation: 59623
Strangely enough it looks like you have to handle the KeyboardInterrupt
in the children as well. I would have expected this to work as written... try changing slowly_square
to:
def slowly_square(i):
try:
sleep(1)
return i * i
except KeyboardInterrupt:
print 'You EVIL bastard!'
return 0
That should work as you expected.
Upvotes: -4