ChristopherC
ChristopherC

Reputation: 1665

Processes not joining even after flushing their queues

I wrote a program using the module multiprocessing which globally executes as follows:

  1. both a simulation and an ui processes are started.
  2. the simulation process feeds a queue with new simulation states. If the queue is full, the simulation loop isn't blocked so it can handle possible incoming messages.
  3. the ui process consumes the simulation queue.
  4. after around 1 second of execution time, the ui process sends a quit event to the main process then exits the loop. Upon exiting it sends a stopped event to the main process through the _create_process()'s inner wrapper() function.
  5. the main process receives both events in whichever order. The quit event results in the main process sending stop signals to all the child processes, while the stopped event increments a counter in the main loop which will cause it to exit after having received as many stopped events as there are processes.
  6. the simulation process receives the stop event and exits the loop, sending in turn a stopped event to the main process.
  7. the main process has now received 2 stopped events in total and concludes that all child processes are—on their way to be—stopped. As a result, the main loop is exited
  8. the run() function flushes the queues which have been written by the child processes.
  9. the child processes are being joined.

The problem is that quite often (but not always) the program will hang upon trying to join the simulation process, as per the log below.

[...]
[INFO/ui] process exiting with exitcode 0
[DEBUG/MainProcess] starting thread to feed data to pipe
[DEBUG/MainProcess] ... done self._thread.start()
[DEBUG/simulation] Queue._start_thread()
[DEBUG/simulation] doing self._thread.start()
[DEBUG/simulation] starting thread to feed data to pipe
[DEBUG/simulation] ... done self._thread.start()
[DEBUG/simulation] telling queue thread to quit
[DEBUG/MainProcess] all child processes (2) should have been stopped!
[INFO/simulation] process shutting down
[DEBUG/simulation] running all "atexit" finalizers with priority >= 0
[DEBUG/simulation] telling queue thread to quit
[DEBUG/simulation] running the remaining "atexit" finalizers
[DEBUG/simulation] joining queue thread
[DEBUG/MainProcess] joining process <Process(simulation, started)>
[DEBUG/simulation] feeder thread got sentinel -- exiting
[DEBUG/simulation] ... queue thread joined
[DEBUG/simulation] joining queue thread

Stopping the execution through a Ctrl + C in the shell results in these mangled tracebacks:

Process simulation:
Traceback (most recent call last):
Traceback (most recent call last):
  File "./debug.py", line 224, in <module>
    run()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/process.py", line 257, in _bootstrap
    util._exit_function()
  File "./debug.py", line 92, in run
    process.join()  #< This doesn't work.
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/util.py", line 312, in _exit_function
    _run_finalizers()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/process.py", line 121, in join
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/util.py", line 252, in _run_finalizers
    finalizer()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/util.py", line 185, in __call__
    res = self._callback(*self._args, **self._kwargs)
    res = self._popen.wait(timeout)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/popen_fork.py", line 54, in wait
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 196, in _finalize_join
    thread.join()
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/popen_fork.py", line 30, in poll
    pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/threading.py", line 1060, in join
    self._wait_for_tstate_lock()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/threading.py", line 1076, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

As for the code, here is a stripped down version of it (hence why it often seems incomplete):

#!/usr/bin/env python3

import logging
import multiprocessing
import pickle
import queue
import time

from collections import namedtuple

_LOGGER = multiprocessing.log_to_stderr()
_LOGGER.setLevel(logging.DEBUG)

_BUFFER_SIZE = 4
_DATA_LENGTH = 2 ** 12

_STATUS_SUCCESS = 0
_STATUS_FAILURE = 1

_EVENT_ERROR = 0
_EVENT_QUIT = 1
_EVENT_STOPPED = 2

_MESSAGE_STOP = 0
_MESSAGE_EVENT = 1
_MESSAGE_SIMULATION_UPDATE = 2

_Message = namedtuple('_Message', ('type', 'value',))
_StopMessage = namedtuple('_StopMessage', ())
_EventMessage = namedtuple('_EventMessage', ('type', 'value',))
_SimulationUpdateMessage = namedtuple('_SimulationUpdateMessage', ('state',))

_MESSAGE_STRUCTS = {
    _MESSAGE_STOP: _StopMessage,
    _MESSAGE_EVENT: _EventMessage,
    _MESSAGE_SIMULATION_UPDATE: _SimulationUpdateMessage
}

def run():
    # Messages from the main process to the child ones.
    downward_queue = multiprocessing.Queue()
    # Messages from the child processes to the main one.
    upward_queue = multiprocessing.Queue()
    # Messages from the simulation process to the UI one.
    simulation_to_ui_queue = multiprocessing.Queue(maxsize=_BUFFER_SIZE)

    # Regroup all the queues that can be written by child processes.
    child_process_queues = (upward_queue, simulation_to_ui_queue,)

    processes = (
        _create_process(
            _simulation,
            upward_queue,
            name='simulation',
            args=(
                simulation_to_ui_queue,
                downward_queue
            )
        ),
        _create_process(
            _ui,
            upward_queue,
            name='ui',
            args=(
                upward_queue,
                simulation_to_ui_queue,
                downward_queue
            )
        )
    )

    try:
        for process in processes:
            process.start()

        _main(downward_queue, upward_queue, len(processes))
    finally:
        # while True:
        #     alive_processes = tuple(process for process in processes
        #                             if process.is_alive())
        #     if not alive_processes:
        #         break

        #     _LOGGER.debug("processes still alive: %s" % (alive_processes,))

        for q in child_process_queues:
            _flush_queue(q)

        for process in processes:
            _LOGGER.debug("joining process %s" % process)
            # process.terminate()  #< This works!
            process.join()  #< This doesn't work.

def _main(downward_queue, upward_queue, process_count):
    try:
        stopped_count = 0
        while True:
            message = _receive_message(upward_queue, False)
            if message is not None and message.type == _MESSAGE_EVENT:
                event_type = message.value.type
                if event_type in (_EVENT_QUIT, _EVENT_ERROR):
                    break
                elif event_type == _EVENT_STOPPED:
                    stopped_count += 1
                    if stopped_count >= process_count:
                        break
    finally:
        # Whatever happens, make sure that all child processes have stopped.
        if stopped_count >= process_count:
            return

        # Send a 'stop' signal to all the child processes.
        for _ in range(process_count):
            _send_message(downward_queue, True, _MESSAGE_STOP)

        while True:
            message = _receive_message(upward_queue, False)
            if (message is not None
                    and message.type == _MESSAGE_EVENT
                    and message.value.type == _EVENT_STOPPED):
                stopped_count += 1
                if stopped_count >= process_count:
                    _LOGGER.debug(
                        "all child processes (%d) should have been stopped!"
                        % stopped_count
                    )
                    break

def _simulation(simulation_to_ui_queue, downward_queue):
    simulation_state = [i * 0.123 for i in range(_DATA_LENGTH)]

    # When the queue is full (possibly form reaching _BUFFER_SIZE), the next
    # solve is computed and kept around until the queue is being consumed.
    next_solve_message = None
    while True:
        message = _receive_message(downward_queue, False)
        if message is not None and message.type == _MESSAGE_STOP:
            break

        if next_solve_message is None:
            # _step(simulation_state)

            # Somehow the copy (pickle) seems to increase the chances for
            # the issue to happen.
            next_solve_message = _SimulationUpdateMessage(
                state=pickle.dumps(simulation_state)
            )

        status = _send_message(simulation_to_ui_queue, False,
                               _MESSAGE_SIMULATION_UPDATE,
                               **next_solve_message._asdict())
        if status == _STATUS_SUCCESS:
            next_solve_message = None

def _ui(upward_queue, simulation_to_ui_queue, downward_queue):
    time_start = -1.0
    previous_time = 0.0
    while True:
        message = _receive_message(downward_queue, False)
        if message is not None and message.type == _MESSAGE_STOP:
            break

        if time_start < 0:
            current_time = 0.0
            time_start = time.perf_counter()
        else:
            current_time = time.perf_counter() - time_start

        message = _receive_message(simulation_to_ui_queue, False)

        if current_time > 1.0:
            _LOGGER.debug("asking to quit")
            _send_message(upward_queue, True, _MESSAGE_EVENT,
                          type=_EVENT_QUIT, value=None)
            break

        previous_time = current_time

def _create_process(target, upward_queue, name='', args=None):
    def wrapper(function, upward_queue, *args, **kwargs):
        try:
            function(*args, **kwargs)
        except Exception:
            _send_message(upward_queue, True, _MESSAGE_EVENT,
                          type=_EVENT_ERROR, value=None)
        finally:
            _send_message(upward_queue, True, _MESSAGE_EVENT,
                          type=_EVENT_STOPPED, value=None)
            upward_queue.close()

    process = multiprocessing.Process(
        target=wrapper,
        name=name,
        args=(target, upward_queue) + args,
        kwargs={}
    )
    return process

def _receive_message(q, block):
    try:
        message = q.get(block=block)
    except queue.Empty:
        return None

    return message

def _send_message(q, block, message_type, **kwargs):
    message_value = _MESSAGE_STRUCTS[message_type](**kwargs)
    try:
        q.put(_Message(type=message_type, value=message_value), block=block)
    except queue.Full:
        return _STATUS_FAILURE

    return _STATUS_SUCCESS

def _flush_queue(q):
    try:
        while True:
            q.get(block=False)
    except queue.Empty:
        pass

if __name__ == '__main__':
    run()

Related questions on StackOverflow and hints in Python's doc basically boil down to needing to flush the queues before joining the processes, which I believe I've been trying to do here. I realize that the simulation queue could still be trying to push the (potentially large) buffered data onto the pipe by the time the program would try to flush them upon exiting, and thus ending up with still non-empty queues. This is why I tried to ensure that all the child processes were stopped before reaching this point. Now, looking at the log above and at the additional log outputted after uncommenting the while True loop checking for alive processes, it appears that the simulation process simply doesn't want to completely shut down even though its target function definitely exited. Could this be the reason of my problem?

If so, how am I suppsoed to deal with it cleanly? Otherwise, what am I missing here?

Tested with Python 3.4 on Mac OS X 10.9.5.

PS: I'm wondering if this couldn't be related to this bug ?

Upvotes: 1

Views: 998

Answers (2)

Julienm
Julienm

Reputation: 188

Lately I have run into a similar use case like yours: multiple processes (up to 11), one input queue, one output queue. But very heavy output queue. I was getting an overhead of up to 5 seconds (!) using your suggestion to perform while process.is_alive(): flush_the_queues() before the process.join().

I've reduced that overhead down to 0.7 seconds by relying on a multiprocessing.Manager.list instead of a multiprocessing.Queue for the output queue. The multiprocessing.Manager.list doesn't need any flushing. I might consider also finding an alternative to the input queue if I can..

Full example here:

import multiprocessing
import queue
import time


PROCESSES = multiprocessing.cpu_count() - 1
processes = []


def run():
    start = time.time()

    input_queue = multiprocessing.Queue()
    feed_input_queue(input_queue)

    with multiprocessing.Manager() as manager:
        output_list = manager.list()

        for _ in range(PROCESSES):
            p = multiprocessing.Process(target=_execute, args=(input_queue, output_list))
            processes.append(p)
            p.start()

        print(f"Time to process = {time.time() - start:.10f}")

        start = time.time()
        
        for p in processes:
            while p.is_alive():  # in principle we could get rid of this if we find an alternative to the output queue
                _flush_queue(input_queue)
            p.join()

        print(f"Time to join = {time.time() - start:.10f}")
        # from here you can do something with the output_list


def _feed_input_queue(input_queue):
    for i in range(10000):
        input_queue.put(i)


def _execute(input_queue: multiprocessing.Queue, output_list: list):
    while not input_queue.empty():
        input_item = input_queue.get()
        output_list.append(do_and_return_something_heavy(input_item))
        return True


def _flush_queue(q):
    try:
        while True:
            q.get(block=False)
    except queue.Empty:
        pass


def do_and_return_something_heavy(input_item):
    return str(input_item) * 100000


if __name__ == '__main__':
    run()

Output

Time to process = 0.1855618954
Time to join = 0.6889970303

Tested on Python 3.6.

Upvotes: 0

ChristopherC
ChristopherC

Reputation: 1665

Sounds like the issue was indeed due to some delay in pushing the data through the queue, causing the flushes to be ineffective because fired too early.

A simple while process.is_alive(): flush_the_queues() seems to do the trick!

Upvotes: 1

Related Questions