Reputation: 664
To accelerate a certain task, I'm subclassing Process
to create a worker that will process data coming in samples. Some managing class will feed it data and read the outputs (using two Queue
instances). For asynchronous operation I'm using put_nowait
and get_nowait
. At the end I'm sending a special exit code to my process, upon which it breaks its internal loop. However... it never happens. Here's a minimal reproducible example:
import multiprocessing as mp
class Worker(mp.Process):
def __init__(self, in_queue, out_queue):
super(Worker, self).__init__()
self.input_queue = in_queue
self.output_queue = out_queue
def run(self):
while True:
received = self.input_queue.get(block=True)
if received is None:
break
self.output_queue.put_nowait(received)
print("\tWORKER DEAD")
class Processor():
def __init__(self):
# prepare
in_queue = mp.Queue()
out_queue = mp.Queue()
worker = Worker(in_queue, out_queue)
# get to work
worker.start()
in_queue.put_nowait(list(range(10**5))) # XXX
# clean up
print("NOTIFYING")
in_queue.put_nowait(None)
#out_queue.get() # XXX
print("JOINING")
worker.join()
Processor()
This code never completes, hanging permanently like this:
NOTIFYING
JOINING
WORKER DEAD
Why?
I've marked two lines with XXX
. In the first one, if I send less data (say, 10**4
), everything will finish normally (processes join as expected). Similarly in the second, if I get()
after notifying the workers to finish. I know I'm missing something but nothing in the documentation seems relevant.
Upvotes: 0
Views: 456
Reputation: 108
Documentation mentions that
When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe. This has some consequences [...] After putting an object on an empty queue there may be an infinitesimal delay before the queue’s empty() method returns False and get_nowait() can return without raising queue.Empty.
https://docs.python.org/3.7/library/multiprocessing.html#pipes-and-queues
and additionally that
whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate.
https://docs.python.org/3.7/library/multiprocessing.html#multiprocessing-programming
This means that the behaviour you describe is caused probably by a racing condition between self.output_queue.put_nowait(received)
in the worker and joining the worker with worker.join()
in the Processers __init__
. If joining was faster than feeding it into the queue, everything finishes fine. If it was too slow, there is an item in the queue, and the worker would not join.
Uncommenting the out_queue.get()
in the main process would empty the queue, which allows joining. But as it is important for the queue to return if the queue would already be empty, using a time-out might be an option to try to wait out the racing condition, e.g out_qeue.get(timeout=10)
.
Possibly important might also be to protect the main routine, especially for Windows (python multiprocessing on windows, if __name__ == "__main__")
Upvotes: 1