scott huang
scott huang

Reputation: 2678

multiprocessing.Queue : Broken pipe error

Note1: I want to use mutiprocessing.Queue in multiple process, When I found this issue occurs in single-process situation. So, the following code is use single process to simplify question.

There is a similar question : Broken pipe error with multiprocessing.Queue.

The answer in that post proves that this issue is because the main thread exits before the queue thread finishes its job. The way he fixed it was by adding sleep(0.1) to his code:

import multiprocessing
import time
def main():
    q = multiprocessing.Queue()
    for i in range(10):
        print i
        q.put(i)
    time.sleep(0.1) # Just enough to let the Queue finish

if __name__ == "__main__":
    main()

But, I think sleep is not a stable method for production code, so I tried to use join to do that. You can see my code below, but unfortunately, it does not work. Is there someone who knows how to do this without sleep?

import multiprocessing
import time


def main():
    q = multiprocessing.Queue()
    for i in range(10):
        q.put(i)
    # time.sleep(4)
    q.close()
    q.join_thread()

if __name__ == "__main__":
    main() 

Upvotes: 4

Views: 6165

Answers (2)

Géry Ogam
Géry Ogam

Reputation: 8027

Program:

import multiprocessing

def main():
    q = multiprocessing.Queue()
    q.put(0)

if __name__ == '__main__':
    main()

Output:

Traceback (most recent call last):
  File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/queues.py", line 251, in _feed
    send_bytes(obj)
  File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py", line 205, in send_bytes 
    self._send_bytes(m[offset:offset + size])
  File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py", line 416, in _send_bytes 
    self._send(header + buf)
  File "/usr/local/Cellar/[email protected]/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py", line 373, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

A BrokenPipeError exception is raised when the queue thread of a multiprocessing.Queue still sends enqueued items to the write end of the queue pipe after the read end of the queue pipe has been automatically closed during its garbage collection following the garbage collection of the queue (the write end of the queue pipe is not garbage collected because it is also referenced by the queue thread).

I think this is a bug so I have opened a pull request on GitHub.

A workaround is make sure that no enqueued items are left for the queue thread to send when the queue is garbage collected, by dequeuing all enqueued items before:

import multiprocessing

def main():
    q = multiprocessing.Queue()
    q.put(0)
    q.get()

if __name__ == '__main__':
    main()

Upvotes: 1

Jacky Wang
Jacky Wang

Reputation: 3490

Let us first describe some details of multiprocessing.Queue.

When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe.

The pipe is created via reader, writer = socket.socketpair().

queue.close() is designed for multi-processes and it does two things

  1. close reader (important!)
  2. send a sentinel value to queue.buffer, the background thread will exit if encounter such value

In single process case, the reason why queue.close() does not work is because step 1, if there still some data in buffer, the background thread will continue to write data to a already closed socket, which lead to Broken pipe error.

A simple example to demo the error

import socket

reader, writer = socket.socketpair()
writer.send("1")

# queue.close() will internally call reader.close()
reader.close()

# got a Broken pipe error
writer.send("2")

In multi-processes case, close reader in main process only decrement the reference count of the underlying socket(the main and child process share the socket), not really close (or shutdown) the socket.

Upvotes: 3

Related Questions