Reputation: 60839
from multiprocessing import Process, Queue
def worker_main(q):
while True:
message = q.get()
q = Queue()
worker = Process(target=worker_main, args=(q,))
worker.start()
q.close()
worker.join()
I expect the call to q.get
in worker_main()
to throw an exception and exit after q
is closed. Instead it hangs ever after the queue is closed in the main process.
My use case seems to be slightly different than the common examples which show Queue.put
in the worker process and Queue.get
in the main process.
In a main process I'm producing tasks that need to be distributed to a pool of worker processes via a queue. However when the tasks are complete I close the queue to indicate to the worker processes it's time to exit.
Perhaps I do not understand the documentation, but I think it's clear that future calls to get
should raise an exception after close
.
get([block[, timeout]])
Remove and return an item from the queue. If optional args
block
isTrue
(the default) andtimeout
isNone
(the default), block if necessary until an item is available. Iftimeout
is a positive number, it blocks at most timeout seconds and raises thequeue.Empty
exception if no item was available within that time. Otherwise (block isFalse
), return an item if one is immediately available, else raise thequeue.Empty
exception (timeout is ignored in that case).Changed in version 3.8: If the queue is closed,
ValueError
is raised instead ofOSError
.
Upvotes: 2
Views: 3298
Reputation: 52149
A multiprocessing.Queue
represents a handle on a shared buffer. Notably, when processes are connected with a queue each process has its own copy of the handle.
Calling .close()
only closes the current process' handle for reading and writing.
close()
Indicate that no more data will be put on this queue by the current process. […]
In order to gracefully close a queue for all subscribers, send a "close message". This is usually a well-defined sentinel object such as None
.
from multiprocessing import Process, Queue
def worker_main(q):
while (message := q.get()) is not None:
print(message)
if __name__ == "__main__":
q = Queue()
q.put("Hello")
worker = Process(target=worker_main, args=(q,))
worker.start()
q.put("World")
q.put(None) # close message
q.close()
worker.join()
Upvotes: 3