Przemek D
Przemek D

Reputation: 664

Process finishes but cannot be joined?

To accelerate a certain task, I'm subclassing Process to create a worker that will process data coming in samples. Some managing class will feed it data and read the outputs (using two Queue instances). For asynchronous operation I'm using put_nowait and get_nowait. At the end I'm sending a special exit code to my process, upon which it breaks its internal loop. However... it never happens. Here's a minimal reproducible example:

import multiprocessing as mp

class Worker(mp.Process):
  def __init__(self, in_queue, out_queue):
    super(Worker, self).__init__()
    self.input_queue = in_queue
    self.output_queue = out_queue

  def run(self):
    while True:
      received = self.input_queue.get(block=True)
      if received is None:
        break
      self.output_queue.put_nowait(received)
    print("\tWORKER DEAD")


class Processor():
  def __init__(self):
    # prepare
    in_queue = mp.Queue()
    out_queue = mp.Queue()
    worker = Worker(in_queue, out_queue)
    # get to work
    worker.start()
    in_queue.put_nowait(list(range(10**5))) # XXX
    # clean up
    print("NOTIFYING")
    in_queue.put_nowait(None)
    #out_queue.get() # XXX
    print("JOINING")
    worker.join()

Processor()

This code never completes, hanging permanently like this:

NOTIFYING
JOINING
    WORKER DEAD

Why?

I've marked two lines with XXX. In the first one, if I send less data (say, 10**4), everything will finish normally (processes join as expected). Similarly in the second, if I get() after notifying the workers to finish. I know I'm missing something but nothing in the documentation seems relevant.

Upvotes: 0

Views: 456

Answers (1)

Robert Guggenberger
Robert Guggenberger

Reputation: 108

Documentation mentions that

When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe. This has some consequences [...] After putting an object on an empty queue there may be an infinitesimal delay before the queue’s empty() method returns False and get_nowait() can return without raising queue.Empty.

https://docs.python.org/3.7/library/multiprocessing.html#pipes-and-queues

and additionally that

whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate.

https://docs.python.org/3.7/library/multiprocessing.html#multiprocessing-programming

This means that the behaviour you describe is caused probably by a racing condition between self.output_queue.put_nowait(received) in the worker and joining the worker with worker.join() in the Processers __init__. If joining was faster than feeding it into the queue, everything finishes fine. If it was too slow, there is an item in the queue, and the worker would not join.

Uncommenting the out_queue.get() in the main process would empty the queue, which allows joining. But as it is important for the queue to return if the queue would already be empty, using a time-out might be an option to try to wait out the racing condition, e.g out_qeue.get(timeout=10).

Possibly important might also be to protect the main routine, especially for Windows (python multiprocessing on windows, if __name__ == "__main__")

Upvotes: 1

Related Questions