Reputation: 9599
With the following code, it seems that the queue instance passed to the worker isn't initialized:
from multiprocessing import Process
from multiprocessing.queues import Queue
class MyQueue(Queue):
def __init__(self, name):
Queue.__init__(self)
self.name = name
def worker(queue):
print queue.name
if __name__ == "__main__":
queue = MyQueue("My Queue")
p = Process(target=worker, args=(queue,))
p.start()
p.join()
This throws:
... line 14, in worker
print queue.name
AttributeError: 'MyQueue' object has no attribute 'name'
I can't re-initialize the queue, because I'll loose the original value of queue.name, even passing the queue's name as an argument to the worker (this should work, but it's not a clean solution).
So, how can inherit from multiprocessing.queues.Queue without getting this error?
Upvotes: 1
Views: 4087
Reputation: 365617
On POSIX, Queue
objects are shared to the child processes by simple inheritance.*
On Windows, that isn't possible, so it has to pickle the Queue
, send it over a pipe to the child, and unpickle it.
(This may not be obvious, because if you actually try to pickle a Queue
, you get an exception, RuntimeError: MyQueue objects should only be shared between processes through inheritance
. If you look through the source, you'll see that this is really a lie—it only raises this exception if you try to pickle a Queue
when multiprocess
is not in the middle of spawning a child process.)
Of course generic pickling and unpickling wouldn't do any good, because you'd end up with two identical queues, not the same queue in two processes. So, multiprocessing
extends things a bit, by adding a register_after_fork
mechanism for objects to use when unpickling.** If you look at the source for Queue
, you can see how it works.
But you don't really need to know how it works to hook it; you can hook it the same way as any other class's pickling. For example, this should work:***
def __getstate__(self):
return self.name, super(MyQueue, self).__getstate__()
def __setstate__(self, state):
self.name, state = state
super(MyQueue, self).__setstate__(state)
For more details, the pickle
documentation explains the different ways you can influence how your class is pickled.
(If it doesn't work, and I haven't made a stupid mistake… then you do have to know at least a little about how it works to hook it… but most likely just to figure out whether to do your extra work before or after the _after_fork()
, which would just require swapping the last two lines…)
* I'm not sure it's actually guaranteed to use simple fork inheritance on POSIX platforms. That happens to be true on 2.7 and 3.3. But there's a fork of multiprocessing
that uses the Windows-style pickle-everything on all platforms for consistency, and another one that uses a hybrid on OS X to allow using CoreFoundation
in single-threaded mode, or something like that, and it's clearly doable that way.
** Actually, I think Queue
is only using register_after_fork
for convenience, and could be rewritten without it… but it's depending on the magic that Pipe
does in its _after_fork
on Windows, or Lock
and BoundedSemaphore
on POSIX.
*** This is only correct because I happen to know, from reading the source, that Queue
is a new-style class, doesn't override __reduce__
or __reduce_ex
, and never returns a falsey value from __getstate__
. If you didn't know that, you'd have to write more code.
Upvotes: 4