velenos14
velenos14

Reputation: 566

mp.Process and queue slower than serial counterpart

I have a python code and I want to make it run faster.

The code, at multiple locations, performs work using two 3 dimensional numpy arrays, using the np.einsum() function. The inputs to the np.einsum() are these two arrays. These operations have an embarrassingly-parallel logic: the can be parallelized because they are identical. That means that the input to np.einsum() can also be a slice along one axis of the second array, and the full first array, this producing just a part of the results which would have been obtained using the full tensor - full tensor multiplication. Assembling all these part results would identically recreate the full result.

I have initially tried to spawn child threads and make them return their calculated part of the final result to the main thread, but I read that this is concurrent and not parallel programming due to the GIL not being released. I then moved to the multiprocessing module and now I do the same logic but with multiprocessing.Processes. Because I read that pickling a not-small array takes a lot of time, I only send as arguments to the target function the indices of the second array between which the spawned process needs to perform work. Please see the below. Unfortunately, the serial execution of the code is much faster than my attempt to parallelize it.

How can I make it to be, well, somewhere close to 4 times faster than the serial execution?

Thank you!

import multiprocessing as mp
import numpy as np
import time

N_rs = 3000
N_thetas = 70

numberOfProcesses = 4

Lobatto_matr = np.random.rand ( N_rs-1,  N_thetas,  N_rs-1 )
Legendre_matr = np.random.rand( N_thetas,  2*N_thetas - 1,  N_thetas  )
after_first_int = np.random.rand( N_rs - 1,  2*N_thetas - 1,  N_thetas )
after_second_int = np.random.rand(2*N_thetas-1,  N_thetas,  N_rs-1)

def perform_3rdInt_in_getCilmoftwithMatrs(que, list_of_idxs):
    # after_third_int = np.einsum('ijk,rjk->rij', Lobatto_matr, after_second_int_as_input_for_this_thread)
    if list_of_idxs[1] == None: # if last process
        after_third_int = np.einsum('ijk,rjk->rij', Lobatto_matr, after_second_int[list_of_idxs[0]:, :, :])
    else:
        after_third_int = np.einsum('ijk,rjk->rij', Lobatto_matr, after_second_int[list_of_idxs[0] : list_of_idxs[1], :, :])
    # return after_third_int
    que.put(after_third_int)


if __name__ == "__main__":
    t0 = time.time()
    after_third_int = np.einsum('ijk,rjk->rij', Lobatto_matr, after_second_int)
    t1 = time.time()
    print(t1 - t0)

    t55 = time.time()
    ps = []
    divisor = after_second_int.shape[0] // numberOfProcesses # =  (2*N_thetas-1) // gl.numberOfProcesses
    q = mp.Queue()
    for i in range(numberOfProcesses):
        if i != (numberOfProcesses-1):
            list_of_idxs = [i*divisor, (i+1)*divisor]
            p = mp.Process(target=perform_3rdInt_in_getCilmoftwithMatrs, args=(q, list_of_idxs, ))
            ps.append(p)
            p.start()
        else: #  i = gl.numberOfThreads-1 and this last process receives the most work
            list_of_idxs = [i*divisor, None]
            p = mp.Process(target=perform_3rdInt_in_getCilmoftwithMatrs, args=(q, list_of_idxs, ))
            ps.append(p)
            p.start()

    rets = []
    for p in ps:
        rets.append(q.get())
    for p in ps:
        p.join()

    after_third_int = np.concatenate(rets)
    t66 = time.time()

    print(t66 - t55)

I get as output:

$ python3 test_processes.py 
67.87767100334167
278.10538840293884

Later edit: I have also tried with the map from mp.Pool, but still I do not get a speed-up when going parallel, and the difference in the execution times (parallel_code - serial_code) goes up as the sizes of the matrices goes up, thus not what I would expect!

def perform_3rdInt_in_getCilmoftwithMatrs(indices):
    if indices[1] == None: # if last process
        after_third_int = np.einsum('ijk,rjk->rij', Lobatto_matr, after_second_int[indices[0]: , :, :])
    else:
        after_third_int = np.einsum('ijk,rjk->rij', Lobatto_matr, after_second_int[indices[0] : indices[1], :, :])
    return after_third_int

if __name__ == "__main__":
    t0 = time.time()
    after_third_int_1 = np.einsum('ijk,rjk->rij', Lobatto_matr, after_second_int)
    t1 = time.time()
    print(t1 - t0)

    t55 = time.time()

    ps = []
    divisor = after_second_int.shape[0] // numberOfProcesses # =  (2*N_thetas-1) // gl.numberOfProcesses
    indices_for_map = []
    for i in range(numberOfProcesses):
        if i != (numberOfProcesses-1):
            indices_for_map.append([i*divisor, (i+1)*divisor]) 
        else:
            indices_for_map.append([i*divisor, None]) 

    with mp.Pool(processes=3) as pool:
        results = pool.map(perform_3rdInt_in_getCilmoftwithMatrs, indices_for_map)
    t555 = time.time()
    print(t555 - t55)

    after_third_int_2 = np.concatenate(results)

    t66 = time.time()
    print(t66 - t55)

Upvotes: 0

Views: 62

Answers (1)

dsm
dsm

Reputation: 2253

You can rather easily do this with dask arrays:

import numpy as np

N_rs = 300 # reduced for demonstration - and my battery life's - sake ;)
N_thetas = 70

Lobatto_matr = np.random.rand ( N_rs-1,  N_thetas,  N_rs-1 )
after_second_int = np.random.rand(2*N_thetas-1,  N_thetas,  N_rs-1);
after_third_int = np.einsum('ijk,rjk->rij', Lobatto_matr, after_second_int)

Let's first turn our inputs into dask arrays:

from dask import array as da
Lobatto_matr_da = da.array(Lobatto_matr).persist()
after_second_int_da = da.array(after_second_int).persist()
# persist stores the inputs in memory
after_third_int_da = da.einsum('ijk,rjk->rij', Lobatto_matr_da, after_second_int_da)

# sanity check that we get the same result
np.testing.assert_allclose(after_third_int_da.compute(), after_third_int)

And now let's run a quick timing run in IPython:

>>> %timeit after_third_int_da = da.einsum('ijk,rjk->rij', Lobatto_matr_da, after_second_int_da).compute()
1.37 s ± 338 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
>>> %timeit after_third_int = np.einsum('ijk,rjk->rij', Lobatto_matr, after_second_int)
1.13 s ± 181 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

You might think, hey, but that's slower, no? Yeah, absolutely, because right now the arrays are small enough to be a single chunk of data and there's no difference with the numpy version, except for some additional Dask overhead. But let's parallelize this via dask.distributed, by running these two lines before the actual computation:

from dask.distributed import Client
client = Client(n_workers=4)  # set up local cluster on your machine
>>> %timeit after_third_int_da = da.einsum('ijk,rjk->rij', Lobatto_matr_da, after_second_int_da).compute()
62.1 ms ± 3.95 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Which is quite a bit faster!

Upvotes: 1

Related Questions