erik
erik

Reputation: 659

Put large ndarrays fast to multiprocessing.Queue

When trying to put a large ndarray to a Queue in a Process, I encounter the following problem:

First, here is the code:

import numpy
import multiprocessing
from ctypes import c_bool
import time


def run(acquisition_running, data_queue):
    while acquisition_running.value:
        length = 65536
        data = numpy.ndarray(length, dtype='float')

        data_queue.put(data)
        time.sleep(0.1)


if __name__ == '__main__':
    acquisition_running = multiprocessing.Value(c_bool)

    data_queue = multiprocessing.Queue()

    process = multiprocessing.Process(
        target=run, args=(acquisition_running, data_queue))

    acquisition_running.value = True
    process.start()
    time.sleep(1)
    acquisition_running.value = False
    process.join()

    print('Finished')

    number_items = 0

    while not data_queue.empty():
        data_item = data_queue.get()
        number_items += 1

    print(number_items)
  1. If I use length=10 or so, everything works fine. I get 9 items transmitted through the Queue.

  2. If I increase to length=1000, on my computer the process.join() blocks, although the function run() is already done. I can comment the line with process.join() and will see, that there are only 2 items put in the Queue, so apparently putting data to the Queue got very slow.

My plan is actually to transport 4 ndarray, each with length 65536. For the Thread this worked very fast (<1ms). Is there a way to improve speed of transmitting data for processes?

I used Python 3.4 on a Windows machine, but with Python 3.4 on Linux I get the same behavior.

Upvotes: 4

Views: 6425

Answers (4)

Marcos de Souza
Marcos de Souza

Reputation: 1

Convert array/list to str(your_array)

q.put(str(your_array))

Upvotes: -3

StatsNoob
StatsNoob

Reputation: 390

One thing you could do to resolve that issue, in tandem with the excellent answer from JPG, is to unload your Queue between every processes.

So do this instead:

process.start()
data_item = data_queue.get()
process.join()

While this does not fully replicate the behavior in the code (number of data counting), you get the idea ;)

Upvotes: -1

Mike McKerns
Mike McKerns

Reputation: 35217

If you have really big arrays, you might want to only pass their pickled state -- or a better alternative might be to use multiprocessing.Array or multiprocessing.sharedctypes.RawArray to make a shared memory array (for the latter, see http://briansimulator.org/sharing-numpy-arrays-between-processes/). You have to worry about conflicts, as you'll have an array that's not bound by the GIL -- and needs locks. However, you only need to send array indices to access the shared array data.

Upvotes: 0

Dr. Jan-Philip Gehrcke
Dr. Jan-Philip Gehrcke

Reputation: 35761

"Is there a way to improve speed of transmitting data for processes?"

Surely, given the right problem to solve. Currently, you are just filling a buffer without emptying it simultaneously. Congratulations, you have just built yourself a so-called deadlock. The corresponding quote from the documentation is:

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe.

But, let's approach this slowly. First of all, "speed" is not your problem! I understand that you are just experimenting with Python's multiprocessing. The most important insight when reading your code is that the flow of communication between parent and child and especially the event handling does not really make sense. If you have a real-world problem that you are trying to solve, you definitely cannot solve it this way. If you do not have a real-world problem, then you first need to come up with a good one before you should start writing code ;-). Eventually, you will need to understand the communication primitives an operating system provides for inter-process communication.

Explanation for what you are observing:

Your child process generates about 10 * length * size(float) bytes of data (considering the fact that your child process can perform about 10 iterations while your parent sleeps about 1 s before it sets acquisition_running to False). While your parent process sleeps, the child puts named amount of data into a queue. You need to appreciate that a queue is a complex construct. You do not need to understand every bit of it. But one thing really really is important: a queue for inter-process communication clearly uses some kind of buffer* that sits between parent and child. Buffers usually have a limited size. You are writing to this buffer from within the child without simultaneously reading from it in the parent. That is, the buffer contents steadily grow while the parent is just sleeping. By increasing length you run into the situation where the queue buffer is full and the child process cannot write to it anymore. However, the child process cannot terminate before it has written all data. At the same time, the parent process waits for the child to terminate.

You see? One entity waits for the other. The parent waits for the child to terminate and the child waits for the parent to make some space. Such a situation is called deadlock. It cannot resolve itself.

Regarding the details, the buffer situation is a little more complex than described above. Your child process has spawned an additional thread that tries to push the buffered data through a pipe to the parent. Actually, the buffer of this pipe is the limiting entity. It is defined by the operating system and, at least on Linux, is usually not larger than 65536 Bytes.

The essential part is, in other words: the parent does not read from the pipe before the child finishes attempting to write to the pipe. In every meaningful scenario where pipes are used, reading and writing happen in a rather simultaneous fashion so that one process can quickly react to input provided by another process. You are doing the exact opposite: you put your parent to sleep and therefore render it dis-responsive to input from the child, resulting in a deadlock situation.

(*) "When a process first puts an item on the queue a feeder thread is started which transfers objects from a buffer into the pipe", from https://docs.python.org/2/library/multiprocessing.html

Upvotes: 11

Related Questions