Petr Nejedlý
Petr Nejedlý

Reputation: 33

Pass big numpy array(multi dimensional) to the multiprocessing queue

I would like to pass numpy array to the multiprocessing queue. The program is working with small size arrays (20x20), however bigger size does not work. In general, I would like to pass 4D tensor with dimensions (100,1,16,12000). Running with python3.6 on Mac.

Code example:

import numpy as np
from multiprocessing import JoinableQueue, Process


class Writer(Process):
    def __init__(self,que):
        Process.__init__(self)
        self.queue=que

    def run(self):
        for i in range(10):
            data=np.random.randn(30,30)
            self.queue.put(data)
            print(i)


class Reader(Process):
    def __init__(self,que):
        Process.__init__(self)
        self.queue=que

    def run(self):
        while not(self.queue.empty()):
            result=self.queue.get()
            print(result)


def main():
    q = JoinableQueue()
    w=Writer(q)
    r=Reader(q)

    w.start()
    w.join()
    print("DONE WRITING")

    r.start()
    r.join()
    print("DONE READING")




if __name__ == "__main__":
    main()

Upvotes: 1

Views: 680

Answers (1)

Vilim Štih
Vilim Štih

Reputation: 21

The python multiprocessing queue is unsuitable for large arrays, as they need to be pickled when put into a queue and unpickled on getting from the queue, which introduces processing and memory overheads.

I developed a small package which uses instead the built-in Python multiprocessing Array class to store the data. A queue is used in the background to pass around metadata. Unlike other solutions I encountered, it works on Mac, Windows and Linux. You can install it with

pip install arrayqueues

The instructions, source and issues are on github: https://github.com/portugueslab/arrayqueues

For simple use-cases it works as a drop-in replacement for the multiprocessing Queue, with the major difference of having to specify the amount of memory the queue will take.

Regarding the reader process, as far as I know, queue.empty() is considered not reliable, and the following pattern is encouraged:

from Queue import Empty # The Empty exception is defined in the normal queue class

# inside the process
while True:
    try:
        item = queue.get()
    except Empty:
        break

Upvotes: 2

Related Questions