Reputation: 33
I would like to pass numpy array to the multiprocessing queue. The program is working with small size arrays (20x20), however bigger size does not work. In general, I would like to pass 4D tensor with dimensions (100,1,16,12000). Running with python3.6 on Mac.
Code example:
import numpy as np
from multiprocessing import JoinableQueue, Process
class Writer(Process):
def __init__(self,que):
Process.__init__(self)
self.queue=que
def run(self):
for i in range(10):
data=np.random.randn(30,30)
self.queue.put(data)
print(i)
class Reader(Process):
def __init__(self,que):
Process.__init__(self)
self.queue=que
def run(self):
while not(self.queue.empty()):
result=self.queue.get()
print(result)
def main():
q = JoinableQueue()
w=Writer(q)
r=Reader(q)
w.start()
w.join()
print("DONE WRITING")
r.start()
r.join()
print("DONE READING")
if __name__ == "__main__":
main()
Upvotes: 1
Views: 680
Reputation: 21
The python multiprocessing queue is unsuitable for large arrays, as they need to be pickled when put into a queue and unpickled on getting from the queue, which introduces processing and memory overheads.
I developed a small package which uses instead the built-in Python multiprocessing Array class to store the data. A queue is used in the background to pass around metadata. Unlike other solutions I encountered, it works on Mac, Windows and Linux. You can install it with
pip install arrayqueues
The instructions, source and issues are on github: https://github.com/portugueslab/arrayqueues
For simple use-cases it works as a drop-in replacement for the multiprocessing Queue, with the major difference of having to specify the amount of memory the queue will take.
Regarding the reader process, as far as I know, queue.empty() is considered not reliable, and the following pattern is encouraged:
from Queue import Empty # The Empty exception is defined in the normal queue class
# inside the process
while True:
try:
item = queue.get()
except Empty:
break
Upvotes: 2