macro_controller
macro_controller

Reputation: 1519

How to handle abnormal child process termination?

I'm using python 3.7 and following this documentation. I want to have a process, which should spawn a child process, wait for it to finish a task, and get some info back. I use the following code:

if __name__ == '__main__':
    q = Queue()
    p = Process(target=some_func, args=(q,))
    p.start()
    print q.get()
    p.join()

When the child process finishes correctly there is no problem, and it works great, but the problem starts when my child process is terminated before it finished. In this case, my application is hanging on wait.

Giving a timeout to q.get() and p.join() not completely solves the issue, because I want to know immediately that the child process died and not to wait to the timeout.

Another problem is that timeout on q.get() yields an exception, which I prefer to avoid.

Can someone suggest me a more elegant way to overcome those issues?

Upvotes: 1

Views: 1992

Answers (1)

Darkonaut
Darkonaut

Reputation: 21654

Queue & Signal

One possibility would be registering a signal handler and use it to pass a sentinel value. On Unix you could handle SIGCHLD in the parent, but that's not an option in your case. According to the signal module:

On Windows, signal() can only be called with SIGABRT, SIGFPE, SIGILL, SIGINT, SIGSEGV, SIGTERM, or SIGBREAK.

Not sure if killing it through Task-Manager will translate into SIGTERM but you can give it a try.

For handling SIGTERM you would need to register the signal handler in the child.

import os
import sys
import time
import signal
from functools import partial
from multiprocessing import Process, Queue

SENTINEL = None


def _sigterm_handler(signum, frame, queue):
    print("received SIGTERM")
    queue.put(SENTINEL)
    sys.exit()


def register_sigterm(queue):
    global _sigterm_handler
    _sigterm_handler = partial(_sigterm_handler, queue=queue)
    signal.signal(signal.SIGTERM, _sigterm_handler)


def some_func(q):
    register_sigterm(q)
    print(os.getpid())
    for i in range(30):
        time.sleep(1)
        q.put(f'msg_{i}')


if __name__ == '__main__':

    q = Queue()
    p = Process(target=some_func, args=(q,))
    p.start()
    for msg in iter(q.get, SENTINEL):
        print(msg)
    p.join()

Example Output:

12273
msg_0
msg_1
msg_2
msg_3
received SIGTERM

Process finished with exit code 0

Queue & Process.is_alive()

Even if this works with Task-Manager, your use-case sounds like you can't exclude force kills, so I think you're better off with an approach which doesn't rely on signals.

You can check in a loop if your process p.is_alive(), call queue.get() with a timeout specified and handle the Empty exceptions:

import os
import time
from queue import Empty
from multiprocessing import Process, Queue

def some_func(q):
    print(os.getpid())
    for i in range(30):
        time.sleep(1)
        q.put(f'msg_{i}')


if __name__ == '__main__':

    q = Queue()
    p = Process(target=some_func, args=(q,))
    p.start()

    while p.is_alive():
        try:
            msg = q.get(timeout=0.1)
        except Empty:
            pass
        else:
            print(msg)

    p.join()

It would be also possible to avoid an exception, but I wouldn't recommend this because you don't spend your waiting time "on the queue", hence decreasing the responsiveness:

while p.is_alive():
    if not q.empty():
        msg = q.get_nowait()
        print(msg)
        time.sleep(0.1)

Pipe & Process.is_alive()

If you intend to utilize one connection per-child, it would however be possible to use a pipe instead of a queue. It's more performant than a queue (which is mounted on top of a pipe) and you can use multiprocessing.connection.wait (Python 3.3+) to await readiness of multiple objects at once.

multiprocessing.connection.wait(object_list, timeout=None)

Wait till an object in object_list is ready. Returns the list of those objects in object_list which are ready. If timeout is a float then the call blocks for at most that many seconds. If timeout is None then it will block for an unlimited period. A negative timeout is equivalent to a zero timeout.

For both Unix and Windows, an object can appear in object_list if it is a readable Connection object; a connected and readable socket.socket object; or the sentinel attribute of a Process object. A connection or socket object is ready when there is data available to be read from it, or the other end has been closed.

Unix: wait(object_list, timeout) almost equivalent select.select(object_list, [], [], timeout). The difference is that, if select.select() is interrupted by a signal, it can raise OSError with an error number of EINTR, whereas wait() will not.

Windows: An item in object_list must either be an integer handle which is waitable (according to the definition used by the documentation of the Win32 function WaitForMultipleObjects()) or it can be an object with a fileno() method which returns a socket handle or pipe handle. (Note that pipe handles and socket handles are not waitable handles.)

You can use this to await the sentinel attribute of the process and the parental end of the pipe concurrently.

import os
import time
from multiprocessing import Process, Pipe
from multiprocessing.connection import wait


def some_func(conn_write):
    print(os.getpid())
    for i in range(30):
        time.sleep(1)
        conn_write.send(f'msg_{i}')


if __name__ == '__main__':

    conn_read, conn_write = Pipe(duplex=False)
    p = Process(target=some_func, args=(conn_write,))
    p.start()

    while p.is_alive():
        wait([p.sentinel, conn_read])  # block-wait until something gets ready
        if conn_read.poll():  # check if something can be received
            print(conn_read.recv())
    p.join()

Upvotes: 1

Related Questions