Reputation: 1665
I wrote a program using the module multiprocessing
which globally executes as follows:
simulation
and an ui
processes are started.simulation
process feeds a queue with new simulation states. If the queue is full, the simulation loop isn't blocked so it can handle possible incoming messages.ui
process consumes the simulation queue.ui
process sends a quit
event to the main process then exits the loop. Upon exiting it sends a stopped
event to the main process through the _create_process()
's inner wrapper()
function.quit
event results in the main process sending stop
signals to all the child processes, while the stopped
event increments a counter in the main loop which will cause it to exit after having received as many stopped
events as there are processes.simulation
process receives the stop
event and exits the loop, sending in turn a stopped
event to the main process.stopped
events in total and concludes that all child processes are—on their way to be—stopped. As a result, the main loop is exitedrun()
function flushes the queues which have been written by the child processes.The problem is that quite often (but not always) the program will hang upon trying to join the simulation
process, as per the log below.
[...]
[INFO/ui] process exiting with exitcode 0
[DEBUG/MainProcess] starting thread to feed data to pipe
[DEBUG/MainProcess] ... done self._thread.start()
[DEBUG/simulation] Queue._start_thread()
[DEBUG/simulation] doing self._thread.start()
[DEBUG/simulation] starting thread to feed data to pipe
[DEBUG/simulation] ... done self._thread.start()
[DEBUG/simulation] telling queue thread to quit
[DEBUG/MainProcess] all child processes (2) should have been stopped!
[INFO/simulation] process shutting down
[DEBUG/simulation] running all "atexit" finalizers with priority >= 0
[DEBUG/simulation] telling queue thread to quit
[DEBUG/simulation] running the remaining "atexit" finalizers
[DEBUG/simulation] joining queue thread
[DEBUG/MainProcess] joining process <Process(simulation, started)>
[DEBUG/simulation] feeder thread got sentinel -- exiting
[DEBUG/simulation] ... queue thread joined
[DEBUG/simulation] joining queue thread
Stopping the execution through a Ctrl + C
in the shell results in these mangled tracebacks:
Process simulation:
Traceback (most recent call last):
Traceback (most recent call last):
File "./debug.py", line 224, in <module>
run()
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/process.py", line 257, in _bootstrap
util._exit_function()
File "./debug.py", line 92, in run
process.join() #< This doesn't work.
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/util.py", line 312, in _exit_function
_run_finalizers()
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/process.py", line 121, in join
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/util.py", line 252, in _run_finalizers
finalizer()
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/util.py", line 185, in __call__
res = self._callback(*self._args, **self._kwargs)
res = self._popen.wait(timeout)
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/popen_fork.py", line 54, in wait
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 196, in _finalize_join
thread.join()
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/popen_fork.py", line 30, in poll
pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/threading.py", line 1060, in join
self._wait_for_tstate_lock()
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/threading.py", line 1076, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
As for the code, here is a stripped down version of it (hence why it often seems incomplete):
#!/usr/bin/env python3
import logging
import multiprocessing
import pickle
import queue
import time
from collections import namedtuple
_LOGGER = multiprocessing.log_to_stderr()
_LOGGER.setLevel(logging.DEBUG)
_BUFFER_SIZE = 4
_DATA_LENGTH = 2 ** 12
_STATUS_SUCCESS = 0
_STATUS_FAILURE = 1
_EVENT_ERROR = 0
_EVENT_QUIT = 1
_EVENT_STOPPED = 2
_MESSAGE_STOP = 0
_MESSAGE_EVENT = 1
_MESSAGE_SIMULATION_UPDATE = 2
_Message = namedtuple('_Message', ('type', 'value',))
_StopMessage = namedtuple('_StopMessage', ())
_EventMessage = namedtuple('_EventMessage', ('type', 'value',))
_SimulationUpdateMessage = namedtuple('_SimulationUpdateMessage', ('state',))
_MESSAGE_STRUCTS = {
_MESSAGE_STOP: _StopMessage,
_MESSAGE_EVENT: _EventMessage,
_MESSAGE_SIMULATION_UPDATE: _SimulationUpdateMessage
}
def run():
# Messages from the main process to the child ones.
downward_queue = multiprocessing.Queue()
# Messages from the child processes to the main one.
upward_queue = multiprocessing.Queue()
# Messages from the simulation process to the UI one.
simulation_to_ui_queue = multiprocessing.Queue(maxsize=_BUFFER_SIZE)
# Regroup all the queues that can be written by child processes.
child_process_queues = (upward_queue, simulation_to_ui_queue,)
processes = (
_create_process(
_simulation,
upward_queue,
name='simulation',
args=(
simulation_to_ui_queue,
downward_queue
)
),
_create_process(
_ui,
upward_queue,
name='ui',
args=(
upward_queue,
simulation_to_ui_queue,
downward_queue
)
)
)
try:
for process in processes:
process.start()
_main(downward_queue, upward_queue, len(processes))
finally:
# while True:
# alive_processes = tuple(process for process in processes
# if process.is_alive())
# if not alive_processes:
# break
# _LOGGER.debug("processes still alive: %s" % (alive_processes,))
for q in child_process_queues:
_flush_queue(q)
for process in processes:
_LOGGER.debug("joining process %s" % process)
# process.terminate() #< This works!
process.join() #< This doesn't work.
def _main(downward_queue, upward_queue, process_count):
try:
stopped_count = 0
while True:
message = _receive_message(upward_queue, False)
if message is not None and message.type == _MESSAGE_EVENT:
event_type = message.value.type
if event_type in (_EVENT_QUIT, _EVENT_ERROR):
break
elif event_type == _EVENT_STOPPED:
stopped_count += 1
if stopped_count >= process_count:
break
finally:
# Whatever happens, make sure that all child processes have stopped.
if stopped_count >= process_count:
return
# Send a 'stop' signal to all the child processes.
for _ in range(process_count):
_send_message(downward_queue, True, _MESSAGE_STOP)
while True:
message = _receive_message(upward_queue, False)
if (message is not None
and message.type == _MESSAGE_EVENT
and message.value.type == _EVENT_STOPPED):
stopped_count += 1
if stopped_count >= process_count:
_LOGGER.debug(
"all child processes (%d) should have been stopped!"
% stopped_count
)
break
def _simulation(simulation_to_ui_queue, downward_queue):
simulation_state = [i * 0.123 for i in range(_DATA_LENGTH)]
# When the queue is full (possibly form reaching _BUFFER_SIZE), the next
# solve is computed and kept around until the queue is being consumed.
next_solve_message = None
while True:
message = _receive_message(downward_queue, False)
if message is not None and message.type == _MESSAGE_STOP:
break
if next_solve_message is None:
# _step(simulation_state)
# Somehow the copy (pickle) seems to increase the chances for
# the issue to happen.
next_solve_message = _SimulationUpdateMessage(
state=pickle.dumps(simulation_state)
)
status = _send_message(simulation_to_ui_queue, False,
_MESSAGE_SIMULATION_UPDATE,
**next_solve_message._asdict())
if status == _STATUS_SUCCESS:
next_solve_message = None
def _ui(upward_queue, simulation_to_ui_queue, downward_queue):
time_start = -1.0
previous_time = 0.0
while True:
message = _receive_message(downward_queue, False)
if message is not None and message.type == _MESSAGE_STOP:
break
if time_start < 0:
current_time = 0.0
time_start = time.perf_counter()
else:
current_time = time.perf_counter() - time_start
message = _receive_message(simulation_to_ui_queue, False)
if current_time > 1.0:
_LOGGER.debug("asking to quit")
_send_message(upward_queue, True, _MESSAGE_EVENT,
type=_EVENT_QUIT, value=None)
break
previous_time = current_time
def _create_process(target, upward_queue, name='', args=None):
def wrapper(function, upward_queue, *args, **kwargs):
try:
function(*args, **kwargs)
except Exception:
_send_message(upward_queue, True, _MESSAGE_EVENT,
type=_EVENT_ERROR, value=None)
finally:
_send_message(upward_queue, True, _MESSAGE_EVENT,
type=_EVENT_STOPPED, value=None)
upward_queue.close()
process = multiprocessing.Process(
target=wrapper,
name=name,
args=(target, upward_queue) + args,
kwargs={}
)
return process
def _receive_message(q, block):
try:
message = q.get(block=block)
except queue.Empty:
return None
return message
def _send_message(q, block, message_type, **kwargs):
message_value = _MESSAGE_STRUCTS[message_type](**kwargs)
try:
q.put(_Message(type=message_type, value=message_value), block=block)
except queue.Full:
return _STATUS_FAILURE
return _STATUS_SUCCESS
def _flush_queue(q):
try:
while True:
q.get(block=False)
except queue.Empty:
pass
if __name__ == '__main__':
run()
Related questions on StackOverflow and hints in Python's doc basically boil down to needing to flush the queues before joining the processes, which I believe I've been trying to do here. I realize that the simulation queue could still be trying to push the (potentially large) buffered data onto the pipe by the time the program would try to flush them upon exiting, and thus ending up with still non-empty queues. This is why I tried to ensure that all the child processes were stopped before reaching this point. Now, looking at the log above and at the additional log outputted after uncommenting the while True
loop checking for alive processes, it appears that the simulation
process simply doesn't want to completely shut down even though its target function definitely exited. Could this be the reason of my problem?
If so, how am I suppsoed to deal with it cleanly? Otherwise, what am I missing here?
Tested with Python 3.4 on Mac OS X 10.9.5.
PS: I'm wondering if this couldn't be related to this bug ?
Upvotes: 1
Views: 998
Reputation: 188
Lately I have run into a similar use case like yours: multiple processes (up to 11), one input queue, one output queue. But very heavy output queue.
I was getting an overhead of up to 5 seconds (!) using your suggestion to perform while process.is_alive(): flush_the_queues()
before the process.join()
.
I've reduced that overhead down to 0.7 seconds by relying on a multiprocessing.Manager.list instead of a multiprocessing.Queue for the output queue. The multiprocessing.Manager.list doesn't need any flushing. I might consider also finding an alternative to the input queue if I can..
Full example here:
import multiprocessing
import queue
import time
PROCESSES = multiprocessing.cpu_count() - 1
processes = []
def run():
start = time.time()
input_queue = multiprocessing.Queue()
feed_input_queue(input_queue)
with multiprocessing.Manager() as manager:
output_list = manager.list()
for _ in range(PROCESSES):
p = multiprocessing.Process(target=_execute, args=(input_queue, output_list))
processes.append(p)
p.start()
print(f"Time to process = {time.time() - start:.10f}")
start = time.time()
for p in processes:
while p.is_alive(): # in principle we could get rid of this if we find an alternative to the output queue
_flush_queue(input_queue)
p.join()
print(f"Time to join = {time.time() - start:.10f}")
# from here you can do something with the output_list
def _feed_input_queue(input_queue):
for i in range(10000):
input_queue.put(i)
def _execute(input_queue: multiprocessing.Queue, output_list: list):
while not input_queue.empty():
input_item = input_queue.get()
output_list.append(do_and_return_something_heavy(input_item))
return True
def _flush_queue(q):
try:
while True:
q.get(block=False)
except queue.Empty:
pass
def do_and_return_something_heavy(input_item):
return str(input_item) * 100000
if __name__ == '__main__':
run()
Output
Time to process = 0.1855618954
Time to join = 0.6889970303
Tested on Python 3.6.
Upvotes: 0
Reputation: 1665
Sounds like the issue was indeed due to some delay in pushing the data through the queue, causing the flushes to be ineffective because fired too early.
A simple while process.is_alive(): flush_the_queues()
seems to do the trick!
Upvotes: 1