Reputation: 942
I've been reading threads like this one but any of them seems to work for my case. I'm trying to parallelize the following toy example to fill a Numpy array inside a for loop using Multiprocessing in Python:
import numpy as np
from multiprocessing import Pool
import time
def func1(x, y=1):
return x**2 + y
def func2(n, parallel=False):
my_array = np.zeros((n))
# Parallelized version:
if parallel:
pool = Pool(processes=6)
for idx, val in enumerate(range(1, n+1)):
result = pool.apply_async(func1, [val])
my_array[idx] = result.get()
pool.close()
# Not parallelized version:
else:
for i in range(1, n+1):
my_array[i-1] = func1(i)
return my_array
def main():
start = time.time()
my_array = func2(60000)
end = time.time()
print(my_array)
print("Normal time: {}\n".format(end-start))
start_parallel = time.time()
my_array_parallelized = func2(60000, parallel=True)
end_parallel = time.time()
print(my_array_parallelized)
print("Time based on multiprocessing: {}".format(end_parallel-start_parallel))
if __name__ == '__main__':
main()
The lines in the code based on Multiprocessing seem to work and give you the right results. However, it takes far longer than the non parallelized version. Here is the output of both versions:
[2.00000e+00 5.00000e+00 1.00000e+01 ... 3.59976e+09 3.59988e+09
3.60000e+09]
Normal time: 0.01605963706970215
[2.00000e+00 5.00000e+00 1.00000e+01 ... 3.59976e+09 3.59988e+09
3.60000e+09]
Time based on multiprocessing: 2.8775112628936768
My intuition tells me that it should be a better way of capturing results from pool.apply_async(). What am I doing wrong? What is the most efficient way to accomplish this? Thx.
Upvotes: 3
Views: 6211
Reputation: 942
Here is the solution proposed by @thisisalsomypassword that improves my initial proposal. That is, "collecting the AsyncResult
objects in a list within the loop and then calling AsyncResult.get()
after all processes have started on each result object":
import numpy as np
from multiprocessing import Pool
import time
def func1(x, y=1):
time.sleep(0.1)
return x**2 + y
def func2(n, parallel=False):
my_array = np.zeros((n))
# Parallelized version:
if parallel:
pool = Pool(processes=6)
####### HERE COMES THE CHANGE #######
results = [pool.apply_async(func1, [val]) for val in range(1, n+1)]
for idx, val in enumerate(results):
my_array[idx] = val.get()
#######
pool.close()
# Not parallelized version:
else:
for i in range(1, n+1):
my_array[i-1] = func1(i)
return my_array
def main():
start = time.time()
my_array = func2(600)
end = time.time()
print(my_array)
print("Normal time: {}\n".format(end-start))
start_parallel = time.time()
my_array_parallelized = func2(600, parallel=True)
end_parallel = time.time()
print(my_array_parallelized)
print("Time based on multiprocessing: {}".format(end_parallel-start_parallel))
if __name__ == '__main__':
main()
Now it works. Time is reduced considerably with Multiprocessing:
Normal time: 60.107836008071
Time based on multiprocessing: 10.049324989318848
time.sleep(0.1)
was added in func1
to cancel out the effect of being a super trivial task.
Upvotes: 1
Reputation: 50358
Creating processes is expensive. On my machine it take at leas several hundred of microsecond per process created. Moreover, the multiprocessing module copy the data to be computed between process and then gather the results from the process pool. This inter-process communication is very slow too. The problem is that your computation is trivial and can be done very quickly, likely much faster than all the introduced overhead. The multiprocessing module is only useful when you are dealing with quite small datasets and perform intensive computation (compared to the amount of computed data).
Hopefully, when it comes to numericals computations using Numpy, there is a simple and fast way to parallelize your application: the Numba JIT. Numba can parallelize a code if you explicitly use parallel structures (parallel=True
and prange
). It uses threads and not heavy processes that are working in shared memory. Numba can overcome the GIL if your code does not deal with native types and Numpy arrays instead of pure Python dynamic object (lists, big integers, classes, etc.). Here is an example:
import numpy as np
import numba as nb
import time
@nb.njit
def func1(x, y=1):
return x**2 + y
@nb.njit('float64[:](int64)', parallel=True)
def func2(n):
my_array = np.zeros(n)
for i in nb.prange(1, n+1):
my_array[i-1] = func1(i)
return my_array
def main():
start = time.time()
my_array = func2(60000)
end = time.time()
print(my_array)
print("Numba time: {}\n".format(end-start))
if __name__ == '__main__':
main()
Because Numba compiles the code at runtime, it is able to fully optimize the loop to a no-op resulting in a time close to 0 second in this case.
Upvotes: 5