BlackCatCoder
BlackCatCoder

Reputation: 51

How does Ctrl-C work with multiple processes in python?

I am trying to distribute work across multiple processes. Exceptions that occur in another process should be propagated back and handled in the main process. This seems to work for exceptions thrown in worker, but not for Ctrl-C.

import time
from concurrent.futures import ProcessPoolExecutor, Future, wait
import traceback


def worker():
    time.sleep(5)
    # raise RuntimeError("Some Error")
    return True


def main():
    with ProcessPoolExecutor() as executor:
        stop = False
        tasks = set()

        while True:
            try:
                # keep submitting new tasks
                if not stop:
                    time.sleep(0.1)
                    print("Submitting task to worker")
                    future = executor.submit(worker)
                    tasks.add(future)

                done, not_done = wait(tasks, timeout=0)
                tasks = not_done

                # get the result
                for future in done:
                    try:
                        result = future.result()
                    except Exception as e:
                        print(f"Exception in worker: {type(e).__name__}: {e}")
                    else:
                        print(f"Worker finished with result: {result}")

                # exit loop if there are no tasks left and loop was stopped
                if stop:
                    print(f"Waiting for {len(tasks)} to finish.")
                    if not len(tasks):
                        print("Finished all remaining tasks")
                        break
                    time.sleep(1)
            except KeyboardInterrupt:
                print("Recieved Ctrl-C")
                stop = True
            except Exception as e:
                print(f"Caught {e}")
                stop = True


if __name__ == "__main__":
    main()

Some of my observations:

I suspect that if Ctrl-C is pressed while a process is being spawned it could lead to some unexpected behaviour.

Edit: These problems occur on both Linux and Windows. Ideally there is a solution for both, but in case of doubt the solution should work on Linux

Upvotes: 3

Views: 372

Answers (1)

Booboo
Booboo

Reputation: 44283

It wasn't clear to me whether you want your worker function to continue running until completion (normal or abnormal) ignoring any Ctrl-C events. Assuming that to be the case, the following code should work under both Linux and Windows.

The idea is to use a "pool initializer", i.e. a function that will run in each pool process prior to executing submitted tasks. Here the initializer executes code to ignore Ctrl-C events (KeyboardInterrupt exceptions).

Note that I have made a few other code adjustments (marked with comments).

import time
from concurrent.futures import ProcessPoolExecutor, Future, wait
import traceback
import signal

def init_pool_processes():
    # Ignore Ctrl-C
    signal.signal(signal.SIGINT, signal.SIG_IGN)


def worker():
    time.sleep(5)
    # raise RuntimeError("Some Error")
    return True


def main():
    # Create pool child processes, which will now
    # ignore Ctrl-C
    with ProcessPoolExecutor(initializer=init_pool_processes) as executor:
        stop = False
        tasks = set()

        while True:
            try:
                # keep submitting new tasks
                if not stop:
                    print("Submitting task to worker")
                    future = executor.submit(worker)
                    tasks.add(future)
                    # Move to here:
                    time.sleep(0.1)

                done, not_done = wait(tasks, timeout=0)
                tasks = not_done

                # get the result
                for future in done:
                    try:
                        result = future.result()
                    except Exception as e:
                        print(f"Exception in worker: {type(e).__name__}: {e}")
                    else:
                        print(f"Worker finished with result: {result}")

                # exit loop if there are no tasks left and loop was stopped
                if stop:
                    print(f"Waiting for {len(tasks)} to finish.")
                    if not len(tasks):
                        print("Finished all remaining tasks")
                        break
                    time.sleep(1)
            except KeyboardInterrupt:
                # Ignore future Ctrl-C events:
                signal.signal(signal.SIGINT, signal.SIG_IGN)
                print("Received Ctrl-C") # spelling
                stop = True
            except Exception as e:
                print(f"Caught {e}")
                # Ignore future Ctrl-C events:
                if not stop:
                    signal.signal(signal.SIGINT, signal.SIG_IGN)
                    stop = True


if __name__ == "__main__":
    main()

Upvotes: 1

Related Questions