Bob
Bob

Reputation: 6173

Python multithreading + multiprocessing BrokenPipeError (subprocess not exiting?)

I am getting BrokenPipeError when threads which employ multiprocessing.JoinableQueue spawn processes. It seems that happens after the program finished working and tries to exit, because it does everyithing it supposed to do. What does it mean, is there a way to fix this / safe to ignore?

import requests
import multiprocessing
from multiprocessing import JoinableQueue
from queue import Queue
import threading


class ProcessClass(multiprocessing.Process):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.func = func

    def run(self):
        while True:
            arg = self.in_queue.get()
            self.func(arg, self.out_queue)
            self.in_queue.task_done()


class ThreadClass(threading.Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.func = func

    def run(self):
        while True:
            arg = self.in_queue.get()
            self.func(arg, self.out_queue)
            self.in_queue.task_done()


def get_urls(host, out_queue):
    r = requests.get(host)
    out_queue.put(r.text)
    print(r.status_code, host)


def get_title(text, out_queue):
    print(text.strip('\r\n ')[:5])


if __name__ == '__main__':
    def test():

        q1 = JoinableQueue()
        q2 = JoinableQueue()

        for i in range(2):
            t = ThreadClass(get_urls, q1, q2)
            t.daemon = True
            t.setDaemon(True)
            t.start()

        for i in range(2):
            t = ProcessClass(get_title, q2, None)
            t.daemon = True
            t.start()

        for host in ("http://ibm.com", "http://yahoo.com", "http://google.com", "http://amazon.com", "http://apple.com",):
            q1.put(host)

        q1.join()
        q2.join()

    test()
    print('Finished')

Program output:

200 http://ibm.com
<!DOC
200 http://google.com
<!doc
200 http://yahoo.com
<!DOC
200 http://apple.com
<!DOC
200 http://amazon.com
<!DOC
Finished
Exception in thread Thread-2:
Traceback (most recent call last):
  File "C:\Python\33\lib\multiprocessing\connection.py", line 313, in _recv_bytes
    nread, err = ov.GetOverlappedResult(True)
BrokenPipeError: [WinError 109]

The pipe has been ended

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Python\33\lib\threading.py", line 901, in _bootstrap_inner
    self.run()
  File "D:\Progs\Uspat\uspat\spider\run\threads_test.py", line 31, in run
    arg = self.in_queue.get()
  File "C:\Python\33\lib\multiprocessing\queues.py", line 94, in get
    res = self._recv()
  File "C:\Python\33\lib\multiprocessing\connection.py", line 251, in recv
    buf = self._recv_bytes()
  File "C:\Python\33\lib\multiprocessing\connection.py", line 322, in _recv_bytes
    raise EOFError
EOFError
....

(cut same errors for other threads.)

If I switch JoinableQueue to queue.Queue for multithreading part, everything fixes, but why?

Upvotes: 3

Views: 5265

Answers (1)

dano
dano

Reputation: 94871

This is happening because you're leaving the background threads blocking in a multiprocessing.Queue.get call when the main thread exits, but it only happens in certain conditions:

  1. A daemon thread is running and blocking on a multiprocessing.Queue.get when the main thread exits.
  2. A multiprocessing.Process is running.
  3. The multiprocessing context is something other than 'fork'.

The exception is telling you that the other end of the Connection that the multiprocessing.JoinableQueue is listening to when its inside of a get() call sent an EOF. Generally this means the other side of the Connection has closed. It makes sense that this happens during shutdown - Python is cleaning up all objects prior to exiting the interpreter, and part of that clean up involves closing all the open Connection objects. What I haven't been able to figure out yet is why it only (and always) happens if a multiprocessing.Process has been spawned (not forked, which is why it doesn't happen on Linux by default) and is still running. I can even reproduce it if I create a multiprocessing.Process that just sleeps in a while loop. It doesn't take any Queue objects at all. For whatever reason, the presence of a running, spawned child process seems to guarantee the exception will be raised. It might simply cause the order that things are destroyed to be just right for race condition to occur, but that's a guess.

In any case, using a queue.Queue instead of multiprocessing.JoinableQueue is a good way to fix it, since you don't actually need a multiprocessing.Queue there. You could also make sure that the background threads and/or background processes are shut down before the main thread, by sending sentinels to their queues. So, make both run methods check for the sentinel:

def run(self):
    for arg in iter(self.in_queue.get, None):  # None is the sentinel
        self.func(arg, self.out_queue)
        self.in_queue.task_done()
    self.in_queue.task_done()

And then send the sentinels when you're done:

    threads = []
    for i in range(2):
        t = ThreadClass(get_urls, q1, q2)
        t.daemon = True
        t.setDaemon(True)
        t.start()
        threads.append(t)

    p = multiprocessing.Process(target=blah)
    p.daemon = True
    p.start()
    procs = []
    for i in range(2):
        t = ProcessClass(get_title, q2, None)
        t.daemon = True
        t.start()
        procs.append(t)

    for host in ("http://ibm.com", "http://yahoo.com", "http://google.com", "http://amazon.com", "http://apple.com",):
        q1.put(host)

    q1.join()
    # All items have been consumed from input queue, lets start shutting down.
    for t in procs:
        q2.put(None)
        t.join()
    for t in threads:
        q1.put(None)
        t.join()
    q2.join()

Upvotes: 5

Related Questions