Bipradeep Saha
Bipradeep Saha

Reputation: 25

Adding arrays to global array using multiprocessing

I've a global NumPy array ys_final and have defined a function that generates an array ys. The ys array will be generated based on an input parameter and I want to add these ys arrays to the global array, i.e ys_final = ys_final + ys.
The order of addition doesn't matter so I want to use Pool.apply_async() from the multiprocessing library but I can't write to the global array. The code for reference is:

import multiprocessing as mp

ys_final = np.zeros(len)
def ys_genrator(i):
    #code to generate ys array
    return ys
pool = mp.Pool(mp.cpu_count())
for i in range(3954):
    ys_final = ys_final + pool.apply_async(ys_genrator, args=(i)).get()
pool.close()
pool.join()

The above block of code keeps on running forever and nothing happens. I've tried *mp.Process also and still I face the same problem. There I defined a target function that directly adds to the global array but it is also not working as the block keeps running forever. Reference:

def func(i):
    #code to generate ys
    global ys_final
    ys_final = ys_final + ys

for i in range(3954):
    p = mp.Process(target=func, args=(i,))
    p.start()
    p.join()

Any suggestions will be really helpful.

EDIT: My ys_genrator is a function for linear interpolation. Based on the parameter i which is an index for rows in a 2D image, the function creates an array of interpolated amplitudes that will be superimposed with all the interpolated amplitudes from the image, so ys need to be added to ys_final The variable len is the length of the interpolated array, which is same for all rows.

For reference, a simpler version of ys_genrator(i) is as follows:

def ys_genrator(i):
    ys = np.ones(10)*i
    return ys

Upvotes: 0

Views: 559

Answers (1)

Booboo
Booboo

Reputation: 44108

A few points:

  1. pool.apply_async(ys_genrator, args=(i)) needs to be pool.apply_async(ys_genrator, args=(i,)). Note the comma after the i.
  2. pool.apply_async(ys_genrator, args=(i,)).get() is exactly equivalent to pool.apply(ys.genrator, args=(i,)). That is, you will block because of your immediate call to get and you will have absolutely no parallism. You would need to do all your calls to pool.apply_async and save the returned AsyncResult instances and only then call get on these instances.
  3. If you are running under Windows, you will have a problem. The code that creates new processes must be within a block governed by if __name__ == '__main__':
  4. If you are running under something like Jupyter Notebook or iPython you will have a problem. The worker function, ys_genrator, would need to be in an external file and imported.
  5. Using apply_async for submitting a lot of tasks is inefficient. You are better of using imap or imap_unordered where the tasks get submitted in "chunks" and you can process the results one by one as they become available. But you must choose a "suitable" chunksize argument.
  6. Any code you have at the global level, such as ys_final = np.zeros(len) will be executed by every sub-process if you are running under Windows, and this can be wasteful if the subprocesses do not need to "see" this variable. If they do need to see this variable, be aware that each process in the pool will be working with its own copy of the variable so it better be a read-only usage. Even then, it can be very wasteful of storage if the variable is large. There are ways of sharing such a variable across the processes but it is not perfectly clear whether you need to (you haven't even defined variable len). So it is difficult to give you improved code. However, it appears that your worker function does not need to "see" ys_final, so I will take a shot at an improved solution.
  7. But be aware that if your function ys_genrator is very trivial, nothing will be gained by using multiprocessing because there is overhead in both creating the processing pool and in passing arguments from one process to another. Also, if ys_genrator is using numpy, this can also be a source of problems since numpy uses multiprocessing for some of its own functions and you are better off not mixing numpy with your own multiprocessing.
import multiprocessing as mp
import numpy as np

SIZE = 3

def ys_genrator(i):
    #code to generate ys array
    # for this dummy example all SIZE entries will end up with the same result:
    ys = [i] * SIZE # for example: [1, 1, 1]
    return ys

def compute_chunksize(poolsize, iterable_size):
    chunksize, remainder = divmod(iterable_size, 4 * poolsize)
    if remainder:
        chunksize += 1
    return chunksize

if __name__ == '__main__':
    ys_final = np.zeros(SIZE)
    n_iterations = 3954
    poolsize = min(mp.cpu_count(), n_iterations)
    chunksize = compute_chunksize(poolsize, n_iterations)
    print('poolsize =', poolsize, 'chunksize =', chunksize)
    pool = mp.Pool(poolsize)
    for result in pool.imap_unordered(ys_genrator, range(n_iterations), chunksize):
        ys_final += result
    print(ys_final)

Prints:

poolsize = 8 chunksize = 124
[7815081. 7815081. 7815081.]

Update

You can also just use:

    for result in pool.map(ys_genrator, range(n_iterations)):
        ys_final += result

The issue is that when you use method map, the method wants to compute an efficient chunksize argument based on the size of the iterable argument (see my compute_chunksize function above, which is essentially what pool.map will use). But to do this, is will have to first convert the iterable to a list to get its size. If n_iterations is very large, this is not very efficient, although it's probably not an major issue for a size of 3954. Still, you would be better off using my compute_chunksize function in this case since you know the size of the iterable and then pass the chunksize argument explicitly to map as I have done in the code using imap_unordered.

Upvotes: 1

Related Questions