Andrew
Andrew

Reputation: 165

Python multiprocessing Queue child class losing attributes in process

I am trying to implement a child class from the multiprocessing Queue in python. The child class contains a simple Boolean flag "ready". When I send the queue to a new process, the ready attribute is disappearing. The following code demonstrates the problem:

import multiprocessing
import multiprocessing.queues


class ReadyQueue(multiprocessing.queues.Queue):
    def __init__(self, ctx, *args, **kwargs):
        super(ReadyQueue, self).__init__(ctx=ctx, *args, **kwargs)
        self.ready = False


def ready_queue(*args, **kwargs):
    return ReadyQueue(ctx=multiprocessing.get_context(), *args, **kwargs)


def foo(q):
    print(q.ready)


if __name__ == "__main__":
    my_queue = ready_queue()
    print(my_queue.ready)
    p = multiprocessing.Process(target=foo, args=(my_queue,))
    p.start()
    p.join()

With the output:

False
Process Process-1:
Traceback (most recent call last):
  File "C:\Users\acre018\Anaconda3\envs\EIT_Qt\lib\multiprocessing\process.py", line 315, in _bootstrap
    self.run()
  File "C:\Users\acre018\Anaconda3\envs\EIT_Qt\lib\multiprocessing\process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\acre018\github\EIT_Qt\Experiments\ready_queue_test.py", line 16, in foo
    print(q.ready)
AttributeError: 'ReadyQueue' object has no attribute 'ready'

Upvotes: 0

Views: 351

Answers (1)

Andrew
Andrew

Reputation: 165

I implemented this workaround:

import multiprocessing
from queue import Empty
import time
import ctypes


class ReadyQueue:
    def __init__(self, *args, **kwargs):
        self.queue = multiprocessing.Queue(*args, **kwargs)
        self._ready = multiprocessing.Value(ctypes.c_bool, False)

    def set_ready(self):
        self._ready.value = True

    def set_not_ready(self):
        self._ready.value = False
        self.clear()

    def is_ready(self):
        return self._ready.value

    def clear(self):
        try:
            while True:
                self.queue.get(block=False)
        except Empty:
            pass

    def get(self, block=True, timeout=None):
        return self.queue.get(block, timeout)

    def put(self, obj, block=True, timeout=None):
        return self.queue.put(obj, block, timeout)

    def full(self):
        return self.queue.full()

    def empty(self):
        return self.queue.empty()

    def qsize(self):
        return self.queue.qsize()


def foo(q):
    while q.is_ready():
        time.sleep(1)
        q.put("hello from foo")
    print("q no longer ready, foo loop finished")


if __name__ == "__main__":
    my_queue = ReadyQueue()
    my_queue.set_ready()
    p = multiprocessing.Process(target=foo, args=(my_queue,))
    p.start()

    for i in range(2):
        print(my_queue.get())
        time.sleep(2)

    print("my_queue._ready = %s, qsize: %d. Setting not ready.." % (str(my_queue.is_ready()), my_queue.qsize()))
    my_queue.set_not_ready()
    print("my_queue._ready = %s, qusize: %d" % (str(my_queue.is_ready()), my_queue.qsize()))

With the output:

C:\Users\acre018\Anaconda3\envs\test_pyqt\python.exe C:/Users/acre018/github/EIT_Qt/Experiments/ready_queue_test2.py
hello from foo
hello from foo
my_queue._ready = True, qsize: 2. Setting not ready..
my_queue._ready = False, qusize: 0
q no longer ready, foo loop finished

Process finished with exit code 0

The workaround is to have my ReadyQueue class not inherit from multiprocessing.queues.Queue but have a queue as an attribute. For convenience I implemented the methods that I need from queue, and they just pass through to the queue attribute. I also implemented a clear method.

Note that in my first example I neglected to make self.ready a multiprocessing.Value, so wouldn't have been able to edit it across processes, but I tested after fixing that and it was not the source of the issue.

Upvotes: 1

Related Questions