usertwelve
usertwelve

Reputation: 11

IOError: Broken pipe with multiprocessing.Pool and multiprocessing.Manager after Ctrl+C

I was using a multiprocessing.Pool from python's standard library to run a bunch of workers. The workers each started subprocesses using python's subprocess library. It was each workers responsibility to manage the subprocesses and clean them up when it was finished.

import multiprocessing as mp


def main():
    processes = 3
    jobs = 6
    pool = mp.Pool(processes)
    for i in range(jobs):
        args = (i,)
        pool.apply_async(worker, args)
    pool.close()
    pool.join()


def worker(i):
    # start processes
    # wait for completion
    # clean up
    time.sleep(1)


main()

It started when I was trying to catch a KeyboardInterrupt in a sane way when pressing Ctrl+C to exit the script from the command line. Note: that this example is a small version of my actual program that does its best to illustrate the problem that I ran into. I found these related posts on stackoverflow:

the former being more applicable than the latter. Investigating I found that a signal.SIGINT gets sent to all the processes (the parent or main and all the children and subchildren) when pressing Ctrl+C on the command line. Just for reference I was on Ubuntu 18.04 on the bash terminal.

I took the suggested approach and ignored the interrupt signal for the child processes. I wrote myself a context manager for convenience and my own sanity.

import multiprocessing as mp
import contextlib # <---
import signal # <---


def main():
    processes = 3
    jobs = 6
    with ignore_interrupt_signals(): # <--- # <---
        pool = mp.Pool(processes)

    for i in range(jobs):
        args = (i,)
        pool.apply_async(worker, args)
    pool.close()
    pool.join()


@contextlib.contextmanager
def ignore_interrupt_signals(): # <---
    previous_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
    yield
    signal.signal(signal.SIGINT, previous_handler)


def worker(i):
    # start processes
    # wait for completion
    # clean up
    time.sleep(1)


main()

This worked fine other than that the worker needed a way to know that it should shutdown and importantly clean up all its spawned subprocesses. For context each worker takes around 45 minutes to complete.

I decided the best way to do this was to use a multiprocessing.Event. The event would be called stop_event and would be set if any process received a signal.SIGINT. To use a multiprocessing.Event that is used by the main and child processes it had to be managed by an multiprocessing.Manager through a proxy.

import multiprocessing as mp
import contextlib
import signal
import sys


def main():
    processes = 3
    jobs = 6

    with ignore_interrupt_signals():
        pool = mp.Pool(processes)

    manager = mp.Manager() # <---
    stop_event = manager.Event() # <---

    try:
        for i in range(jobs):
            args = (i,)
            pool.apply_async(worker, args)
    except KeyboardInterrupt: # <---
        stop_event.set() # <---
        pool.close()
        pool.join()
        sys.exit() # <---


@contextlib.contextmanager
def ignore_interrupt_signals():
    previous_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
    yield
    signal.signal(signal.SIGINT, previous_handler)


def worker(i, stop_event):
    # start processes
    while not stop_event.set(): # <---
        # wait for completion
        time.sleep(1)
    # clean up


main()

Now the workers were safe from getting interrupted and not cleaning up their subprocesses, and the main processes catches the KeyboardInterrupt after the pool is created. When the exception is caught the pool is closed and the processes are joined. I thought this would work. However, I got an IOError from the stop_event.set() call.

Termination started due to Ctrl+C, cleaning up...
Traceback (most recent call last):
  File "...", line 1109, in <module>
    main()
  File "...", line 42, in main
    args.func(args)
  File "...", line 189, in run_command
    stop_event.set()
  File "/usr/lib/python2.7/multiprocessing/managers.py", line 1011, in set
    return self._callmethod('set')
  File "/usr/lib/python2.7/multiprocessing/managers.py", line 758, in _callmethod
    conn.send((self._id, methodname, args, kwds))
IOError: [Errno 32] Broken pipe

The traceback has a lot of other tracebacks removed but the traceback of interest is a broken pipe when trying to set the stop_event using the manager proxy.

Upvotes: 0

Views: 1644

Answers (1)

usertwelve
usertwelve

Reputation: 11

The multiprocessing.Manager is started as a server and that server is started as another process. Therefore, the manager process also receives the signal.SIGINT from the Ctrl+C and terminates. This causes the pipes used by the stop_event manager proxy to go down ungracefully. The best way I found to avoid this problem is to start the manager with signal.SIGINT ignored as well.

import multiprocessing as mp
import contextlib
import signal
import sys


def main():
    processes = 3
    jobs = 6

    with ignore_interrupt_signals():
        pool = mp.Pool(processes)
        manager = mp.Manager() # <---

    stop_event = manager.Event()

    try:
        for i in range(jobs):
            args = (i,)
            pool.apply_async(worker, args)
    except KeyboardInterrupt:
        stop_event.set()
        pool.close()
        pool.join()
        sys.exit()

@contextlib.contextmanager
def ignore_interrupt_signals():
    previous_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
    yield
    signal.signal(signal.SIGINT, previous_handler)


def worker(i, stop_event):
    # start processes
    while not stop_event.set():
        # wait for completion
        time.sleep(1)
    # clean up


main()

I answered my own question because I hope this can be helpful to someone else as I spent way to much time tracing socket and pipe errors in my code base to determine this rather simple solution.

Upvotes: 1

Related Questions