smagnan
smagnan

Reputation: 1257

How to process SIGTERM and still have a working process.terminate()

I am trying to find a way to handle SIGTERM nicely and have my subprocesses terminate when the main process received a SIGTERM.

Basically, I am creating processes manually (but I believe the issue is the same with mp.pool for example)

import multiprocessing as mp

...

workers = [
    mp.Process(
        target=worker,
        args=(...,)
    ) for _ in range(nb_workers)
]

and I am catching signals

signal.signal(signal.SIGTERM, term)
signal.signal(signal.SIGINT, term)
signal.signal(signal.SIGQUIT, term)
signal.signal(signal.SIGABRT, term)

When a signal is caught, I want to terminate all my subprocesses and exit. I do not want to wait for them to finish running as their individual run times can be pretty long (understand a few minutes).

The same way, I cannot really set a threading.Event() all process would look at periodically as they are basically just doing one huge, but slow operation (depending on a few libraries).

My idea was to set a flag when a signal is caught, and then have a watchdog terminate all subprocesses when the flag is set. But using .terminate() also uses SIGTERM, which is caught again by my signal handlers.

Ex, simplified code:

import multiprocessing as mp
import signal
import time

FLAG = False


def f(x):
    time.sleep(5)
    print(x)
    return x * x


def term(signum, frame):
    print(f'Received Signal {signum}')
    global FLAG
    FLAG = True


def terminate(w):
    for process in w:
        print('Terminating worker {}'.format(process.pid))
        process.terminate()
        process.join()
        process.close()


signal.signal(signal.SIGTERM, term)
signal.signal(signal.SIGINT, term)
signal.signal(signal.SIGQUIT, term)
signal.signal(signal.SIGABRT, term)


if __name__ == '__main__':
    workers = [
        mp.Process(
            target=f,
            args=(i,)
        ) for i in range(4)
    ]
    for process in workers:
        process.start()
    while not FLAG:
        time.sleep(0.1)
    print('flag set')
    terminate(workers)
    print('Done')

If I interrupt the code before the processes are done (with ctrl-c):

Received Signal 2
Received Signal 2
Received Signal 2
Received Signal 2
Received Signal 2

flag set
Terminating worker 27742
Received Signal 15
0
Terminating worker 27743
Received Signal 15
1
3
2
Terminating worker 27744
Terminating worker 27745
Done

As you can see, it seems that .terminate() does not terminate the sub-processes as they keep running to their end, and as it appears we catch the resulting SIGTERM (15) too.

So far, my solutions are:

Is there any clean way to handle this?

Upvotes: 0

Views: 5498

Answers (1)

Booboo
Booboo

Reputation: 44108

The solution very much depends on what platform you are running on as is often the case for Python questions tagged with [multiprocessing] and it is for that reason one is supposed also tag such questions with the specific platform, such as [linux], too. I am inferring that your platform is not Windows since signal.SIGQUIT is not defined for that platform. So I will go with Linux.

  1. For Linux you do not want your subprocesses to handle the signals at all (and it's sort of nonsensical for them to be calling function term on an Ctrl-C interrupt, for example). For Windows, however, you want your subprocesses to ignore these interrupts. That means you want your main process to call signal only after it has created the subprocesses.
  2. Instead of using FLAG to indicate that the main process should terminate and have to have the main process loop testing this value periodically, it is simpler, cleaner and more efficient to have the main process just wait on a threading.Event instance, done_event. Although. for some reason, this does not seem to work on Windows; the main process wait call does not get satisfied immediately.
  3. You would like some provision to terminate gracefully if and when your processes complete normally and there has been so signal triggered. The easiest way to accomplish all your goals including this is to make your subprocesses daemon processes that will terminate when the main process terminates. Then create a daemon thread that simply waits for the subprocesses to normally terminate and sets done_event when that occurs. So the main process will fall through on the call to done_event.wait() on either an interrupt of some sort or normal completion. All it has to do now is just end normally; there is no need to call terminate against the subprocesses since they will end when the main process ends.
import multiprocessing as mp
from threading import Thread, Event
import signal
import time
import sys


IS_WINDOWS = sys.platform == 'win32'

def f(x):
    if IS_WINDOWS:
        signal.signal(signal.SIGTERM, signal.SIG_IGN)
        signal.signal(signal.SIGINT, signal.SIG_IGN)
        signal.signal(signal.SIGABRT, signal.SIG_IGN)

    time.sleep(5)
    print(x)
    return x * x

def term(signum, frame):
    print(f'Received Signal {signum}')
    if IS_WINDOWS:
        globals()['FLAG'] = True
    else:
        done_event.set()

def process_wait_thread():
    """
    wait for processes to finish normally and set done_event
    """
    for process in workers:
        process.join()

    if IS_WINDOWS:
        globals()['FLAG'] = True
    else:
        done_event.set()

if __name__ == '__main__':

    if IS_WINDOWS:
        globals()['FLAG'] = False
    else:
        done_event = Event()

    workers = [
        mp.Process(
            target=f,
            args=(i,),
            daemon=True
        ) for i in range(4)
    ]
    for process in workers:
        process.start()

    # We don't want subprocesses to inherit these so
    # call signal after we start the processes:
    signal.signal(signal.SIGTERM, term)
    signal.signal(signal.SIGINT, term)
    if not IS_WINDOWS:
        signal.signal(signal.SIGQUIT, term) # Not supported by Windows at all
    signal.signal(signal.SIGABRT, term)

    Thread(target=process_wait_thread, daemon=True).start()

    if IS_WINDOWS:
        while not globals()['FLAG']:
            time.sleep(0.1)
    else:
        done_event.wait()

    print('Done')

Upvotes: 2

Related Questions