Reputation: 2678
Note1: I want to use mutiprocessing.Queue in multiple process, When I found this issue occurs in single-process situation. So, the following code is use single process to simplify question.
There is a similar question : Broken pipe error with multiprocessing.Queue.
The answer in that post proves that this issue is because the main thread exits before the queue thread finishes its job. The way he fixed it was by adding sleep(0.1)
to his code:
import multiprocessing
import time
def main():
q = multiprocessing.Queue()
for i in range(10):
print i
q.put(i)
time.sleep(0.1) # Just enough to let the Queue finish
if __name__ == "__main__":
main()
But, I think sleep is not a stable method for production code, so I tried to use join
to do that. You can see my code below, but unfortunately, it does not work. Is there someone who knows how to do this without sleep?
import multiprocessing
import time
def main():
q = multiprocessing.Queue()
for i in range(10):
q.put(i)
# time.sleep(4)
q.close()
q.join_thread()
if __name__ == "__main__":
main()
Upvotes: 4
Views: 6165
Reputation: 8027
Program:
import multiprocessing
def main():
q = multiprocessing.Queue()
q.put(0)
if __name__ == '__main__':
main()
Output:
Traceback (most recent call last):
File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/queues.py", line 251, in _feed
send_bytes(obj)
File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py", line 205, in send_bytes
self._send_bytes(m[offset:offset + size])
File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py", line 416, in _send_bytes
self._send(header + buf)
File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py", line 373, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
A BrokenPipeError
exception is raised when the queue thread of a multiprocessing.Queue
still sends enqueued items to the write end of the queue pipe after the read end of the queue pipe has been automatically closed during its garbage collection following the garbage collection of the queue (the write end of the queue pipe is not garbage collected because it is also referenced by the queue thread).
I think this is a bug so I have opened a pull request on GitHub.
A workaround is make sure that no enqueued items are left for the queue thread to send when the queue is garbage collected, by dequeuing all enqueued items before:
import multiprocessing
def main():
q = multiprocessing.Queue()
q.put(0)
q.get()
if __name__ == '__main__':
main()
Upvotes: 1
Reputation: 3490
Let us first describe some details of multiprocessing.Queue
.
When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe.
The pipe is created via reader, writer = socket.socketpair()
.
queue.close()
is designed for multi-processes and it does two things
reader
(important!)sentinel
value to queue.buffer
, the background thread
will exit if encounter such valueIn single process case, the reason why queue.close()
does not work is because step 1, if there still some data in buffer
, the background thread will continue to write data to a already closed socket, which lead to Broken pipe
error.
A simple example to demo the error
import socket
reader, writer = socket.socketpair()
writer.send("1")
# queue.close() will internally call reader.close()
reader.close()
# got a Broken pipe error
writer.send("2")
In multi-processes case, close reader
in main process only decrement the reference count of the underlying socket(the main and child process share the socket), not really close (or shutdown) the socket.
Upvotes: 3