Reputation: 11
I was using a multiprocessing.Pool from python's standard library to run a bunch of workers. The workers each started subprocesses using python's subprocess library. It was each workers responsibility to manage the subprocesses and clean them up when it was finished.
import multiprocessing as mp
def main():
processes = 3
jobs = 6
pool = mp.Pool(processes)
for i in range(jobs):
args = (i,)
pool.apply_async(worker, args)
pool.close()
pool.join()
def worker(i):
# start processes
# wait for completion
# clean up
time.sleep(1)
main()
It started when I was trying to catch a KeyboardInterrupt in a sane way when pressing Ctrl+C to exit the script from the command line. Note: that this example is a small version of my actual program that does its best to illustrate the problem that I ran into. I found these related posts on stackoverflow:
the former being more applicable than the latter. Investigating I found that a signal.SIGINT gets sent to all the processes (the parent or main and all the children and subchildren) when pressing Ctrl+C on the command line. Just for reference I was on Ubuntu 18.04 on the bash terminal.
I took the suggested approach and ignored the interrupt signal for the child processes. I wrote myself a context manager for convenience and my own sanity.
import multiprocessing as mp
import contextlib # <---
import signal # <---
def main():
processes = 3
jobs = 6
with ignore_interrupt_signals(): # <--- # <---
pool = mp.Pool(processes)
for i in range(jobs):
args = (i,)
pool.apply_async(worker, args)
pool.close()
pool.join()
@contextlib.contextmanager
def ignore_interrupt_signals(): # <---
previous_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
yield
signal.signal(signal.SIGINT, previous_handler)
def worker(i):
# start processes
# wait for completion
# clean up
time.sleep(1)
main()
This worked fine other than that the worker needed a way to know that it should shutdown and importantly clean up all its spawned subprocesses. For context each worker takes around 45 minutes to complete.
I decided the best way to do this was to use a multiprocessing.Event. The event would be called stop_event and would be set if any process received a signal.SIGINT. To use a multiprocessing.Event that is used by the main and child processes it had to be managed by an multiprocessing.Manager through a proxy.
import multiprocessing as mp
import contextlib
import signal
import sys
def main():
processes = 3
jobs = 6
with ignore_interrupt_signals():
pool = mp.Pool(processes)
manager = mp.Manager() # <---
stop_event = manager.Event() # <---
try:
for i in range(jobs):
args = (i,)
pool.apply_async(worker, args)
except KeyboardInterrupt: # <---
stop_event.set() # <---
pool.close()
pool.join()
sys.exit() # <---
@contextlib.contextmanager
def ignore_interrupt_signals():
previous_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
yield
signal.signal(signal.SIGINT, previous_handler)
def worker(i, stop_event):
# start processes
while not stop_event.set(): # <---
# wait for completion
time.sleep(1)
# clean up
main()
Now the workers were safe from getting interrupted and not cleaning up their subprocesses, and the main processes catches the KeyboardInterrupt after the pool is created. When the exception is caught the pool is closed and the processes are joined. I thought this would work. However, I got an IOError from the stop_event.set() call.
Termination started due to Ctrl+C, cleaning up...
Traceback (most recent call last):
File "...", line 1109, in <module>
main()
File "...", line 42, in main
args.func(args)
File "...", line 189, in run_command
stop_event.set()
File "/usr/lib/python2.7/multiprocessing/managers.py", line 1011, in set
return self._callmethod('set')
File "/usr/lib/python2.7/multiprocessing/managers.py", line 758, in _callmethod
conn.send((self._id, methodname, args, kwds))
IOError: [Errno 32] Broken pipe
The traceback has a lot of other tracebacks removed but the traceback of interest is a broken pipe when trying to set the stop_event using the manager proxy.
Upvotes: 0
Views: 1644
Reputation: 11
The multiprocessing.Manager is started as a server and that server is started as another process. Therefore, the manager process also receives the signal.SIGINT from the Ctrl+C and terminates. This causes the pipes used by the stop_event manager proxy to go down ungracefully. The best way I found to avoid this problem is to start the manager with signal.SIGINT ignored as well.
import multiprocessing as mp
import contextlib
import signal
import sys
def main():
processes = 3
jobs = 6
with ignore_interrupt_signals():
pool = mp.Pool(processes)
manager = mp.Manager() # <---
stop_event = manager.Event()
try:
for i in range(jobs):
args = (i,)
pool.apply_async(worker, args)
except KeyboardInterrupt:
stop_event.set()
pool.close()
pool.join()
sys.exit()
@contextlib.contextmanager
def ignore_interrupt_signals():
previous_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
yield
signal.signal(signal.SIGINT, previous_handler)
def worker(i, stop_event):
# start processes
while not stop_event.set():
# wait for completion
time.sleep(1)
# clean up
main()
I answered my own question because I hope this can be helpful to someone else as I spent way to much time tracing socket and pipe errors in my code base to determine this rather simple solution.
Upvotes: 1