Vytas
Vytas

Reputation: 49

Multiprocessing KeyboardInterrupt handling

This problem seems to have been eluding me - all the solutions are more like workarounds and add quite a bit of complexity to the code. Since its been a good while since any posts regarding this have been made, are there any simple solutions to the following - upon detecting a keyboard interrupt, cleanly exit all the childs proceses, terminate the program?

Code below is snippet of my multiproccess structure - I'd like to preserve as much as posible, while adding the needed functionality:

from multiprocessing import Pool
import time

def multiprocess_init(l):
    global lock
    lock = l

def synchronous_print(i):
    with lock:
        print i
        time.sleep(1)

if __name__ == '__main__':

    lock = Lock()
    pool = Pool(processes=5, initializer=multiprocess_init, initargs=(lock, ))

    for i in range(1,20):
        pool.map_async(synchronous_print, [i])

    pool.close() #necessary to prevent zombies
    pool.join() #wait for all processes to finish

Upvotes: 0

Views: 1035

Answers (1)

tdelaney
tdelaney

Reputation: 77337

The short answer is to move to python 3. Python 2 has multiple problems with thread/process synchronization that have been fixed in python 3.

In your case, multiprocessing will doggedly recreate your child processes every time you send keyboard interrupt and pool.close will get stuck and never exit. You can reduce the problem by explicitly exiting the child process with os.exit and by waiting for individual results from apply_async so that you don't get stuck in pool.close prison.

from multiprocessing import Pool, Lock
import time
import os

def multiprocess_init(l):
    global lock
    lock = l
    print("initialized child")

def synchronous_print(i):
    try:
        with lock:
            print i
            time.sleep(1)
    except KeyboardInterrupt:
        print("exit child")
        os.exit(2)

if __name__ == '__main__':

    lock = Lock()
    pool = Pool(processes=5, initializer=multiprocess_init, initargs=(lock, ))

    results = []
    for i in range(1,20):
        results.append(pool.map_async(synchronous_print, [i]))

    for result in results:
        print('wait result')
        result.wait()

    pool.close() #necessary to prevent zombies
    pool.join() #wait for all processes to finish
    print("Join completes")

Upvotes: 1

Related Questions