Reputation: 322
Suppose I have a number of items that I put in a queue for other processes to deal with. The items are rather large in memory, therefore I limit the queue size. At some point I will have no more things to put in the queue. How can I signal the other processes that the queue is closed?
One option would be to close the child processes when the queue is empty, but this relies on the queue being emptied slower than it is being filled.
The documentation of multiprocessing.Queue
talks about the following method:
close()
Indicate that no more data will be put on this queue by the current process. The background thread will quit once it has flushed all buffered data to the pipe. This is called automatically when the queue is garbage collected.
Is it safe to call close while there are still items in the queue? Are these items guaranteed to be processed? How can a another processes know that the queue is closed?
Upvotes: 1
Views: 2906
Reputation: 17561
a multiprocessing queue is simply a pipe with a lock to avoid concurrent reads/writes from different processes.
a pipe typically has 2 sides, a read and a write, when a process tries to read from a pipe, the OS will first serve things that are in the pipe, but if the pipe is empty, the OS will suspend this process, and check if any process can write to the write end, if the answer is yes, then the OS just keeps this process suspended till someone else writes to the pipe, and if there is no one else that can write to the pipe, then the OS will send an end-of-file to the reader, which wakes him up and tells him "don't wait on a message, none can send a message on this pipe".
in the case of a queue, it is different, as the reading process has both a read and a write ends of this pipe, the number of processes that can write to the queue is never zero, so reading from a queue that no other process can write to will result in the program being paused indefinitely, the reader has no direct way of knowing that the queue was closed by the other processes when they do.
the way multiprocessing library itself handles it in its pools is to send a message on the queue that will terminate the workers, for example the reader can terminate once it sees None
on the pipe or some predefined object or string like "END"
or "CLOSE"
, since this will be the last item on the queue, there should be no items after it, and once the reader reads it he will terminate, and if you have multiple readers then you should send multiple end messages on the queue.
but what if the child process crashes or for some reason doesn't send it ? your main process will be stuck on the get
and will be suspended indefinitely .... so if you are manually using a queue you should take all precautions to make sure this doesn't happen (like setting a timeout, and monitoring the other writers in another thread, etc.)
Upvotes: 1
Reputation: 58534
This is a common scenario: how do I tell all queue consumers that no more items will be enqueued? Multiprocessing apps using POSIX message queues, datagram sockets, or even just named pipes, for example, might all face this.
The easiest thing to do here would be to enqueue a single, special "all done" message, which each consumer receives and puts()
back on the queue for the next consumer to do the same.
(close()
is indeed safe but inapplicable here. Any "in flight" items will be safely enqueued, but the close()
doesn't tell the consumers that no more producers remain.)
Upvotes: 2
Reputation: 834
Is this a theoretical question or do you have some code your trying to get to work?
Answering the first question, yes you can use the close()
method on a multiprocessing.Queue
while there still are items in the queue, but note the method will only indicate to other processes that no more data will be put on the queue by the current process. The items that are already in the queue should still be processed by the other processes.
And you could place a sentinel value in the que that the other processes can then check.
Example, for the check
def worker(queue, event):
#Continously check event flag while it is not set
while not event.is_set():
try:
#Get an item from que with 1 second timeout
item = queue.get(timeout=1)
if item is None:
event.set()
print("Worker: Queue is closed")
break
print("Worker: Processing item {}".format(item))
#Process the item
except Empty:
#If the queue is empty and timeout is reached then pass
pass
def handleQue():
#Creating a queue and a event flag
queue = Queue()
event = Event()
#Start three worker processes
processes = [Process(target=worker, args=(queue, event)) for i in range(3)]
for process in processes:
process.start()
#Put items in the queue
for i in range(10):
queue.put(i)
print("Main Process: Putting item {} in the queue".format(i))
#Signal to other processes that the queue is closed
for i in range(3):
queue.put(None)
print("Main Process: Putting sentinel value in the queue")
for process in processes:
process.join()
event.clear()
Upvotes: 1