MikeiLL
MikeiLL

Reputation: 6570

multiprocessinq.Queue as attribute of Queue.Queue child

I'm trying to figure out what the following module is doing.

import Queue
import multiprocessing
import threading

class BufferedReadQueue(Queue.Queue):
    def __init__(self, lim=None):
        self.raw = multiprocessing.Queue(lim)
        self.__listener = threading.Thread(target=self.listen)
        self.__listener.setDaemon(True)
        self.__listener.start()
        Queue.Queue.__init__(self, lim)

    def listen(self):
        try:
            while True:
                self.put(self.raw.get())
        except:
            pass

    @property
    def buffered(self):
        return self.qsize()

It is only instantiated once in the calling code, and the .raw attribute, multiprocessing.Queue, gets sent to another class, which appears to inherit from multiprocessing.Process.

So as I'm seeing it, an attribute of BufferedReadQueue is being used as a Queue, but not the class (nor an instance of it) itself.

What would be a reason that BufferedReadQueue inherits from Queue.Queue and not just object, if it's not actually being used as a queue?

Upvotes: 1

Views: 133

Answers (1)

dano
dano

Reputation: 94961

It looks like BufferedReadQueue is meant to be used as a way to convert the read end of a multiprocessing.Queue into a normal Queue.Queue. Note this in __init__:

    self.__listener = threading.Thread(target=self.listen)
    self.__listener.setDaemon(True)
    self.__listener.start()

This starts up a listener thread, which just constantly tries to get items from the internal multiprocessing.Queue, and then puts all those items to self. It looks like the use-case is something like this:

def func(queue):
   queue.put('stuff')
   ...

buf_queue = BufferedReadQueue()
proc = multiprocessing.Process(target=func, args=(buf_queue.raw,))
proc.start()
out = buf_queue.get()  # Only get calls in the parent

Now, why would you do this instead of just using the multiprocessing.Queue directly? Probably because multiprocessing.Queue has some shortcomings that Queue.Queue doesn't. For example qsize(), which this BufferedReadQueue uses, is not reliable with multiprocessing.Queue:

Return the approximate size of the queue. Because of multithreading/multiprocessing semantics, this number is not reliable.

Note that this may raise NotImplementedError on Unix platforms like Mac OS X where sem_getvalue() is not implemented.

It's also possible to introspect a Queue.Queue, and peek at its contents without popping them. This isn't possible with a multiprocessing.Queue.

Upvotes: 1

Related Questions