Antonio Serrano
Antonio Serrano

Reputation: 942

Multiprocessing in Python: Parallelize a for loop to fill a Numpy array

I've been reading threads like this one but any of them seems to work for my case. I'm trying to parallelize the following toy example to fill a Numpy array inside a for loop using Multiprocessing in Python:

import numpy as np
from multiprocessing import Pool
import time


def func1(x, y=1):
    return x**2 + y

def func2(n, parallel=False):
    my_array = np.zeros((n))
    
    # Parallelized version:
    if parallel:
        pool = Pool(processes=6)
        for idx, val in enumerate(range(1, n+1)):
            result = pool.apply_async(func1, [val])
            my_array[idx] = result.get()
        pool.close()

    # Not parallelized version:
    else:
        for i in range(1, n+1):
            my_array[i-1] = func1(i)

    return my_array

def main():
    start = time.time()
    my_array = func2(60000)
    end = time.time()
    
    print(my_array)
    print("Normal time: {}\n".format(end-start))

    start_parallel = time.time()
    my_array_parallelized = func2(60000, parallel=True)
    end_parallel = time.time()

    print(my_array_parallelized)
    print("Time based on multiprocessing: {}".format(end_parallel-start_parallel))


if __name__ == '__main__':
    main()

The lines in the code based on Multiprocessing seem to work and give you the right results. However, it takes far longer than the non parallelized version. Here is the output of both versions:

[2.00000e+00 5.00000e+00 1.00000e+01 ... 3.59976e+09 3.59988e+09
 3.60000e+09]
Normal time: 0.01605963706970215

[2.00000e+00 5.00000e+00 1.00000e+01 ... 3.59976e+09 3.59988e+09
 3.60000e+09]
Time based on multiprocessing: 2.8775112628936768

My intuition tells me that it should be a better way of capturing results from pool.apply_async(). What am I doing wrong? What is the most efficient way to accomplish this? Thx.

Upvotes: 3

Views: 6211

Answers (2)

Antonio Serrano
Antonio Serrano

Reputation: 942

Here is the solution proposed by @thisisalsomypassword that improves my initial proposal. That is, "collecting the AsyncResult objects in a list within the loop and then calling AsyncResult.get() after all processes have started on each result object":

import numpy as np
from multiprocessing import Pool
import time


def func1(x, y=1):
    time.sleep(0.1)
    return x**2 + y

def func2(n, parallel=False):
    my_array = np.zeros((n))
    
    # Parallelized version:
    if parallel:
        pool = Pool(processes=6)
        ####### HERE COMES THE CHANGE #######
        results = [pool.apply_async(func1, [val]) for val in range(1, n+1)]
        for idx, val in enumerate(results):
            my_array[idx] = val.get()
        #######
        pool.close()

    # Not parallelized version:
    else:
        for i in range(1, n+1):
            my_array[i-1] = func1(i)

    return my_array

def main():
    start = time.time()
    my_array = func2(600)
    end = time.time()
    
    print(my_array)
    print("Normal time: {}\n".format(end-start))

    start_parallel = time.time()
    my_array_parallelized = func2(600, parallel=True)
    end_parallel = time.time()

    print(my_array_parallelized)
    print("Time based on multiprocessing: {}".format(end_parallel-start_parallel))


if __name__ == '__main__':
    main()

Now it works. Time is reduced considerably with Multiprocessing:

Normal time: 60.107836008071
Time based on multiprocessing: 10.049324989318848    

time.sleep(0.1) was added in func1 to cancel out the effect of being a super trivial task.

Upvotes: 1

Jérôme Richard
Jérôme Richard

Reputation: 50358

Creating processes is expensive. On my machine it take at leas several hundred of microsecond per process created. Moreover, the multiprocessing module copy the data to be computed between process and then gather the results from the process pool. This inter-process communication is very slow too. The problem is that your computation is trivial and can be done very quickly, likely much faster than all the introduced overhead. The multiprocessing module is only useful when you are dealing with quite small datasets and perform intensive computation (compared to the amount of computed data).

Hopefully, when it comes to numericals computations using Numpy, there is a simple and fast way to parallelize your application: the Numba JIT. Numba can parallelize a code if you explicitly use parallel structures (parallel=True and prange). It uses threads and not heavy processes that are working in shared memory. Numba can overcome the GIL if your code does not deal with native types and Numpy arrays instead of pure Python dynamic object (lists, big integers, classes, etc.). Here is an example:

import numpy as np
import numba as nb
import time

@nb.njit
def func1(x, y=1):
    return x**2 + y

@nb.njit('float64[:](int64)', parallel=True)
def func2(n):
    my_array = np.zeros(n)
    for i in nb.prange(1, n+1):
        my_array[i-1] = func1(i)
    return my_array

def main():
    start = time.time()
    my_array = func2(60000)
    end = time.time()
    
    print(my_array)
    print("Numba time: {}\n".format(end-start))

if __name__ == '__main__':
    main()

Because Numba compiles the code at runtime, it is able to fully optimize the loop to a no-op resulting in a time close to 0 second in this case.

Upvotes: 5

Related Questions