cdonts
cdonts

Reputation: 9599

How to inherit from a multiprocessing queue?

With the following code, it seems that the queue instance passed to the worker isn't initialized:

from multiprocessing import Process
from multiprocessing.queues import Queue

class MyQueue(Queue):

    def __init__(self, name):
        Queue.__init__(self)
        self.name = name

def worker(queue):
    print queue.name

if __name__ == "__main__":
    queue = MyQueue("My Queue")
    p = Process(target=worker, args=(queue,))
    p.start()
    p.join()

This throws:

... line 14, in worker
    print queue.name
AttributeError: 'MyQueue' object has no attribute 'name'

I can't re-initialize the queue, because I'll loose the original value of queue.name, even passing the queue's name as an argument to the worker (this should work, but it's not a clean solution).

So, how can inherit from multiprocessing.queues.Queue without getting this error?

Upvotes: 1

Views: 4087

Answers (1)

abarnert
abarnert

Reputation: 365617

On POSIX, Queue objects are shared to the child processes by simple inheritance.*

On Windows, that isn't possible, so it has to pickle the Queue, send it over a pipe to the child, and unpickle it.

(This may not be obvious, because if you actually try to pickle a Queue, you get an exception, RuntimeError: MyQueue objects should only be shared between processes through inheritance. If you look through the source, you'll see that this is really a lie—it only raises this exception if you try to pickle a Queue when multiprocess is not in the middle of spawning a child process.)

Of course generic pickling and unpickling wouldn't do any good, because you'd end up with two identical queues, not the same queue in two processes. So, multiprocessing extends things a bit, by adding a register_after_fork mechanism for objects to use when unpickling.** If you look at the source for Queue, you can see how it works.

But you don't really need to know how it works to hook it; you can hook it the same way as any other class's pickling. For example, this should work:***

def __getstate__(self):
    return self.name, super(MyQueue, self).__getstate__()

def __setstate__(self, state):
    self.name, state = state
    super(MyQueue, self).__setstate__(state)

For more details, the pickle documentation explains the different ways you can influence how your class is pickled.

(If it doesn't work, and I haven't made a stupid mistake… then you do have to know at least a little about how it works to hook it… but most likely just to figure out whether to do your extra work before or after the _after_fork(), which would just require swapping the last two lines…)


* I'm not sure it's actually guaranteed to use simple fork inheritance on POSIX platforms. That happens to be true on 2.7 and 3.3. But there's a fork of multiprocessing that uses the Windows-style pickle-everything on all platforms for consistency, and another one that uses a hybrid on OS X to allow using CoreFoundation in single-threaded mode, or something like that, and it's clearly doable that way.

** Actually, I think Queue is only using register_after_fork for convenience, and could be rewritten without it… but it's depending on the magic that Pipe does in its _after_fork on Windows, or Lock and BoundedSemaphore on POSIX.

*** This is only correct because I happen to know, from reading the source, that Queue is a new-style class, doesn't override __reduce__ or __reduce_ex, and never returns a falsey value from __getstate__. If you didn't know that, you'd have to write more code.

Upvotes: 4

Related Questions