Reputation: 25
I've a global NumPy array ys_final and have defined a function that generates an array ys. The ys array will be generated based on an input parameter and I want to add these ys arrays to the global array, i.e ys_final = ys_final + ys
.
The order of addition doesn't matter so I want to use Pool.apply_async() from the multiprocessing library but I can't write to the global array. The code for reference is:
import multiprocessing as mp
ys_final = np.zeros(len)
def ys_genrator(i):
#code to generate ys array
return ys
pool = mp.Pool(mp.cpu_count())
for i in range(3954):
ys_final = ys_final + pool.apply_async(ys_genrator, args=(i)).get()
pool.close()
pool.join()
The above block of code keeps on running forever and nothing happens. I've tried *mp.Process also and still I face the same problem. There I defined a target function that directly adds to the global array but it is also not working as the block keeps running forever. Reference:
def func(i):
#code to generate ys
global ys_final
ys_final = ys_final + ys
for i in range(3954):
p = mp.Process(target=func, args=(i,))
p.start()
p.join()
Any suggestions will be really helpful.
EDIT:
My ys_genrator is a function for linear interpolation. Based on the parameter i which is an index for rows in a 2D image, the function creates an array of interpolated amplitudes that will be superimposed with all the interpolated amplitudes from the image, so ys
need to be added to ys_final
The variable len
is the length of the interpolated array, which is same for all rows.
For reference, a simpler version of ys_genrator(i)
is as follows:
def ys_genrator(i):
ys = np.ones(10)*i
return ys
Upvotes: 0
Views: 559
Reputation: 44108
A few points:
pool.apply_async(ys_genrator, args=(i))
needs to be pool.apply_async(ys_genrator, args=(i,))
. Note the comma after the i
.pool.apply_async(ys_genrator, args=(i,)).get()
is exactly equivalent to pool.apply(ys.genrator, args=(i,))
. That is, you will block because of your immediate call to get
and you will have absolutely no parallism. You would need to do all your calls to pool.apply_async
and save the returned AsyncResult
instances and only then call get
on these instances.if __name__ == '__main__':
ys_genrator
, would need to be in an external file and imported.apply_async
for submitting a lot of tasks is inefficient. You are better of using imap
or imap_unordered
where the tasks get submitted in "chunks" and you can process the results one by one as they become available. But you must choose a "suitable" chunksize argument.ys_final = np.zeros(len)
will be executed by every sub-process if you are running under Windows, and this can be wasteful if the subprocesses do not need to "see" this variable. If they do need to see this variable, be aware that each process in the pool will be working with its own copy of the variable so it better be a read-only usage. Even then, it can be very wasteful of storage if the variable is large. There are ways of sharing such a variable across the processes but it is not perfectly clear whether you need to (you haven't even defined variable len
). So it is difficult to give you improved code. However, it appears that your worker function does not need to "see" ys_final
, so I will take a shot at an improved solution.ys_genrator
is very trivial, nothing will be gained by using multiprocessing because there is overhead in both creating the processing pool and in passing arguments from one process to another. Also, if ys_genrator
is using numpy
, this can also be a source of problems since numpy
uses multiprocessing for some of its own functions and you are better off not mixing numpy
with your own multiprocessing.import multiprocessing as mp
import numpy as np
SIZE = 3
def ys_genrator(i):
#code to generate ys array
# for this dummy example all SIZE entries will end up with the same result:
ys = [i] * SIZE # for example: [1, 1, 1]
return ys
def compute_chunksize(poolsize, iterable_size):
chunksize, remainder = divmod(iterable_size, 4 * poolsize)
if remainder:
chunksize += 1
return chunksize
if __name__ == '__main__':
ys_final = np.zeros(SIZE)
n_iterations = 3954
poolsize = min(mp.cpu_count(), n_iterations)
chunksize = compute_chunksize(poolsize, n_iterations)
print('poolsize =', poolsize, 'chunksize =', chunksize)
pool = mp.Pool(poolsize)
for result in pool.imap_unordered(ys_genrator, range(n_iterations), chunksize):
ys_final += result
print(ys_final)
Prints:
poolsize = 8 chunksize = 124
[7815081. 7815081. 7815081.]
Update
You can also just use:
for result in pool.map(ys_genrator, range(n_iterations)):
ys_final += result
The issue is that when you use method map
, the method wants to compute an efficient chunksize
argument based on the size of the iterable argument (see my compute_chunksize
function above, which is essentially what pool.map
will use). But to do this, is will have to first convert the iterable to a list to get its size. If n_iterations
is very large, this is not very efficient, although it's probably not an major issue for a size of 3954. Still, you would be better off using my compute_chunksize
function in this case since you know the size of the iterable and then pass the chunksize argument explicitly to map
as I have done in the code using imap_unordered
.
Upvotes: 1