Mike
Mike

Reputation: 60839

Why does multiprocessing Queue.get() block after the queue is closed?

from multiprocessing import Process, Queue

def worker_main(q):
    while True:
        message = q.get()


q = Queue()
worker = Process(target=worker_main, args=(q,))
worker.start()
q.close()
worker.join()

I expect the call to q.get in worker_main() to throw an exception and exit after q is closed. Instead it hangs ever after the queue is closed in the main process.

My use case seems to be slightly different than the common examples which show Queue.put in the worker process and Queue.get in the main process.

In a main process I'm producing tasks that need to be distributed to a pool of worker processes via a queue. However when the tasks are complete I close the queue to indicate to the worker processes it's time to exit.

Perhaps I do not understand the documentation, but I think it's clear that future calls to get should raise an exception after close.

get([block[, timeout]])

Remove and return an item from the queue. If optional args block is True (the default) and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the queue.Empty exception if no item was available within that time. Otherwise (block is False), return an item if one is immediately available, else raise the queue.Empty exception (timeout is ignored in that case).

Changed in version 3.8: If the queue is closed, ValueError is raised instead of OSError.

Upvotes: 2

Views: 3298

Answers (1)

MisterMiyagi
MisterMiyagi

Reputation: 52149

A multiprocessing.Queue represents a handle on a shared buffer. Notably, when processes are connected with a queue each process has its own copy of the handle.

Calling .close() only closes the current process' handle for reading and writing.

close()

Indicate that no more data will be put on this queue by the current process. […]


In order to gracefully close a queue for all subscribers, send a "close message". This is usually a well-defined sentinel object such as None.

from multiprocessing import Process, Queue

def worker_main(q):
    while (message := q.get()) is not None:
        print(message)


if __name__ == "__main__":
    q = Queue()
    q.put("Hello")
    worker = Process(target=worker_main, args=(q,))
    worker.start()
    q.put("World")
    q.put(None)  # close message
    q.close()
    worker.join()

Upvotes: 3

Related Questions