Reputation: 1257
I am trying to find a way to handle SIGTERM nicely and have my subprocesses terminate when the main process received a SIGTERM.
Basically, I am creating processes manually (but I believe the issue is the same with mp.pool
for example)
import multiprocessing as mp
...
workers = [
mp.Process(
target=worker,
args=(...,)
) for _ in range(nb_workers)
]
and I am catching signals
signal.signal(signal.SIGTERM, term)
signal.signal(signal.SIGINT, term)
signal.signal(signal.SIGQUIT, term)
signal.signal(signal.SIGABRT, term)
When a signal is caught, I want to terminate all my subprocesses and exit. I do not want to wait for them to finish running as their individual run times can be pretty long (understand a few minutes).
The same way, I cannot really set a threading.Event()
all process would look at periodically as they are basically just doing one huge, but slow operation (depending on a few libraries).
My idea was to set a flag when a signal is caught, and then have a watchdog terminate all subprocesses when the flag is set. But using .terminate()
also uses SIGTERM, which is caught again by my signal handlers.
Ex, simplified code:
import multiprocessing as mp
import signal
import time
FLAG = False
def f(x):
time.sleep(5)
print(x)
return x * x
def term(signum, frame):
print(f'Received Signal {signum}')
global FLAG
FLAG = True
def terminate(w):
for process in w:
print('Terminating worker {}'.format(process.pid))
process.terminate()
process.join()
process.close()
signal.signal(signal.SIGTERM, term)
signal.signal(signal.SIGINT, term)
signal.signal(signal.SIGQUIT, term)
signal.signal(signal.SIGABRT, term)
if __name__ == '__main__':
workers = [
mp.Process(
target=f,
args=(i,)
) for i in range(4)
]
for process in workers:
process.start()
while not FLAG:
time.sleep(0.1)
print('flag set')
terminate(workers)
print('Done')
If I interrupt the code before the processes are done (with ctrl-c):
Received Signal 2
Received Signal 2
Received Signal 2
Received Signal 2
Received Signal 2
flag set
Terminating worker 27742
Received Signal 15
0
Terminating worker 27743
Received Signal 15
1
3
2
Terminating worker 27744
Terminating worker 27745
Done
As you can see, it seems that .terminate()
does not terminate the sub-processes as they keep running to their end, and as it appears we catch the resulting SIGTERM (15) too.
So far, my solutions are:
threading.Event()
. This mean rethinking completely what our current processes are doing..kill()
instead of .terminate()
. This works on Linux but it is a less clean exit. Not sure about windows, but I was under the impression that on Windows .kill == .terminate.Is there any clean way to handle this?
Upvotes: 0
Views: 5498
Reputation: 44108
The solution very much depends on what platform you are running on as is often the case for Python questions tagged with [multiprocessing]
and it is for that reason one is supposed also tag such questions with the specific platform, such as [linux]
, too. I am inferring that your platform is not Windows since signal.SIGQUIT
is not defined for that platform. So I will go with Linux.
term
on an Ctrl-C interrupt, for example). For Windows, however, you want your subprocesses to ignore these interrupts. That means you want your main process to call signal
only after it has created the subprocesses.FLAG
to indicate that the main process should terminate and have to have the main process loop testing this value periodically, it is simpler, cleaner and more efficient to have the main process just wait on a threading.Event
instance, done_event
. Although. for some reason, this does not seem to work on Windows; the main process wait
call does not get satisfied immediately.done_event
when that occurs. So the main process will fall through on the call to done_event.wait()
on either an interrupt of some sort or normal completion. All it has to do now is just end normally; there is no need to call terminate
against the subprocesses since they will end when the main process ends.import multiprocessing as mp
from threading import Thread, Event
import signal
import time
import sys
IS_WINDOWS = sys.platform == 'win32'
def f(x):
if IS_WINDOWS:
signal.signal(signal.SIGTERM, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGABRT, signal.SIG_IGN)
time.sleep(5)
print(x)
return x * x
def term(signum, frame):
print(f'Received Signal {signum}')
if IS_WINDOWS:
globals()['FLAG'] = True
else:
done_event.set()
def process_wait_thread():
"""
wait for processes to finish normally and set done_event
"""
for process in workers:
process.join()
if IS_WINDOWS:
globals()['FLAG'] = True
else:
done_event.set()
if __name__ == '__main__':
if IS_WINDOWS:
globals()['FLAG'] = False
else:
done_event = Event()
workers = [
mp.Process(
target=f,
args=(i,),
daemon=True
) for i in range(4)
]
for process in workers:
process.start()
# We don't want subprocesses to inherit these so
# call signal after we start the processes:
signal.signal(signal.SIGTERM, term)
signal.signal(signal.SIGINT, term)
if not IS_WINDOWS:
signal.signal(signal.SIGQUIT, term) # Not supported by Windows at all
signal.signal(signal.SIGABRT, term)
Thread(target=process_wait_thread, daemon=True).start()
if IS_WINDOWS:
while not globals()['FLAG']:
time.sleep(0.1)
else:
done_event.wait()
print('Done')
Upvotes: 2