Reputation: 165
I am trying to implement a child class from the multiprocessing Queue in python. The child class contains a simple Boolean flag "ready". When I send the queue to a new process, the ready attribute is disappearing. The following code demonstrates the problem:
import multiprocessing
import multiprocessing.queues
class ReadyQueue(multiprocessing.queues.Queue):
def __init__(self, ctx, *args, **kwargs):
super(ReadyQueue, self).__init__(ctx=ctx, *args, **kwargs)
self.ready = False
def ready_queue(*args, **kwargs):
return ReadyQueue(ctx=multiprocessing.get_context(), *args, **kwargs)
def foo(q):
print(q.ready)
if __name__ == "__main__":
my_queue = ready_queue()
print(my_queue.ready)
p = multiprocessing.Process(target=foo, args=(my_queue,))
p.start()
p.join()
With the output:
False
Process Process-1:
Traceback (most recent call last):
File "C:\Users\acre018\Anaconda3\envs\EIT_Qt\lib\multiprocessing\process.py", line 315, in _bootstrap
self.run()
File "C:\Users\acre018\Anaconda3\envs\EIT_Qt\lib\multiprocessing\process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\acre018\github\EIT_Qt\Experiments\ready_queue_test.py", line 16, in foo
print(q.ready)
AttributeError: 'ReadyQueue' object has no attribute 'ready'
Upvotes: 0
Views: 351
Reputation: 165
I implemented this workaround:
import multiprocessing
from queue import Empty
import time
import ctypes
class ReadyQueue:
def __init__(self, *args, **kwargs):
self.queue = multiprocessing.Queue(*args, **kwargs)
self._ready = multiprocessing.Value(ctypes.c_bool, False)
def set_ready(self):
self._ready.value = True
def set_not_ready(self):
self._ready.value = False
self.clear()
def is_ready(self):
return self._ready.value
def clear(self):
try:
while True:
self.queue.get(block=False)
except Empty:
pass
def get(self, block=True, timeout=None):
return self.queue.get(block, timeout)
def put(self, obj, block=True, timeout=None):
return self.queue.put(obj, block, timeout)
def full(self):
return self.queue.full()
def empty(self):
return self.queue.empty()
def qsize(self):
return self.queue.qsize()
def foo(q):
while q.is_ready():
time.sleep(1)
q.put("hello from foo")
print("q no longer ready, foo loop finished")
if __name__ == "__main__":
my_queue = ReadyQueue()
my_queue.set_ready()
p = multiprocessing.Process(target=foo, args=(my_queue,))
p.start()
for i in range(2):
print(my_queue.get())
time.sleep(2)
print("my_queue._ready = %s, qsize: %d. Setting not ready.." % (str(my_queue.is_ready()), my_queue.qsize()))
my_queue.set_not_ready()
print("my_queue._ready = %s, qusize: %d" % (str(my_queue.is_ready()), my_queue.qsize()))
With the output:
C:\Users\acre018\Anaconda3\envs\test_pyqt\python.exe C:/Users/acre018/github/EIT_Qt/Experiments/ready_queue_test2.py
hello from foo
hello from foo
my_queue._ready = True, qsize: 2. Setting not ready..
my_queue._ready = False, qusize: 0
q no longer ready, foo loop finished
Process finished with exit code 0
The workaround is to have my ReadyQueue
class not inherit from multiprocessing.queues.Queue
but have a queue as an attribute. For convenience I implemented the methods that I need from queue, and they just pass through to the queue attribute. I also implemented a clear
method.
Note that in my first example I neglected to make self.ready
a multiprocessing.Value
, so wouldn't have been able to edit it across processes, but I tested after fixing that and it was not the source of the issue.
Upvotes: 1