DrSpill
DrSpill

Reputation: 572

How to optimize Python code to use Multiprocessing

I have a Python code that rebins a numpy array for downsampling. However, when the input array is large this has to be done in a loop by windowing the data into smaller chunks.

What I am looking for right now is a way to use multiprocessing to speed-up the rebin function. Currently the function uses a single CPU. I have some knowledge of the multiprocessing module but I would appreciate advice on converting the below code to use multiprocessing.

NOTE: I appreciate recommendations (probably more pythonic) that can speed-up the below function differently. However, I am also keen to learn if someone can help upgrade the below code to be suitable for multiprocessing.

MWE:

import numpy as np
from tqdm.auto import tqdm

def rebin(arr, new_shape):
    shape = (arr.shape[0], new_shape[0], arr.shape[1] // new_shape[0],
             new_shape[1], arr.shape[2] // new_shape[1])
    return arr.reshape(shape).mean(-1).mean(2)


def rebinner(arr, window):
    # w is the window size used to perform rebin on a fixed size window as the entire
    # data cannot be fit in memory for large data size.
    y = []
    for j in tqdm(range(0, arr.shape[0], window)):
        y.append(rebin(arr[j:j + window, :, None]*arr[j:j + window, None, :], [32, 32]))
    return np.concatenate(y, axis=0)


arr = np.random.random((1000,2560,))

The below code is run in a jupyter notebook cell for checking execution time. Can also be used in a script using timeit.

%%time
print(rebinner(arr, window=10).shape)

Expected Output:

(1000, 32, 32)
CPU times: user 8.52 s, sys: 4.58 s, total: 13.1 s
Wall time: 13.1 s

UPDATE 1:

Output using numba library as suggested by @john-zwinck

Based on the comments from @johnzwinck I updated the code slightly and included numa decorators. However, the new script throws assertion error and I am not quite sure what is causing that. Below is the updated code and corresponding error message.

import numba
import numpy as np


@numba.njit(nopython=True)
def rebinner2(arr, new_shape):
    shape = (arr.shape[0], new_shape[0], arr.shape[1] // new_shape[0],
             new_shape[1], arr.shape[2] // new_shape[1])
    return arr.reshape(shape).mean(-1).mean(2)


@numba.njit(nopython=True)
def rebinner1(arr, window):
    return [rebinner2(np.random.random((window, 1, 2560))*np.random.random((window, 2560, 1)),
                      [32, 32]) for j in range(0, arr.shape[0], window)]
    # return [rebinner2(arr[j:j + window, :, None]*arr[j:j + window, None, :],
    # [32, 32]) for j in range(0, arr.shape[0], window)]


def rebinner(arr, window):
    return np.concatenate(rebinner1(arr,window), axis=0)


if __name__ == "__main__":
    arr = np.random.random((1000, 2560))
    print(rebinner(arr, window=10).shape)
    # print(rebinner1(arr, window=10).shape)

Output:

venv/lib/python3.6/site-packages/numba/core/decorators.py:252: RuntimeWarning: nopython is set for njit and is ignored
  warnings.warn('nopython is set for njit and is ignored', RuntimeWarning)
Traceback (most recent call last):
  File "numba_tester.py", line 28, in <module>
    print(rebinner(arr, window=10).shape)
  File "numba_tester.py", line 23, in rebinner
    return np.concatenate(rebinner1(arr,window), axis=0)
  File "venv/lib/python3.6/site-packages/numba/core/dispatcher.py", line 415, in _compile_for_args
    error_rewrite(e, 'typing')
  File "venv/lib/python3.6/site-packages/numba/core/dispatcher.py", line 358, in error_rewrite
    reraise(type(e), e, None)
  File "venv/lib/python3.6/site-packages/numba/core/utils.py", line 80, in reraise
    raise value.with_traceback(tb)
numba.core.errors.TypingError: Failed in nopython mode pipeline (step: nopython frontend)
Failed in nopython mode pipeline (step: nopython frontend)
- Resolution failure for literal arguments:
AssertionError()
- Resolution failure for non-literal arguments:
AssertionError()

During: resolving callee type: BoundFunction(array.mean for array(float64, 5d, C))
During: typing of call at numba_tester.py (9)


File "numba_tester.py", line 9:
def rebinner2(arr, new_shape):
    <source elided>
             new_shape[1], arr.shape[2] // new_shape[1])
    return arr.reshape(shape).mean(-1).mean(2)
    ^

During: resolving callee type: type(CPUDispatcher(<function rebinner2 at 0x7f6c5743e730>))
During: typing of call at numba_tester.py (17)

During: resolving callee type: type(CPUDispatcher(<function rebinner2 at 0x7f6c5743e730>))
During: typing of call at numba_tester.py (17)


File "numba_tester.py", line 17:
def rebinner1(arr, window):
    <source elided>
    return [rebinner2(np.random.random((window, 1, 2560))*np.random.random((window, 2560, 1)),
                      [32, 32]) for j in range(0, arr.shape[0], window)]

Notes: The above code works fine when implemented without the numba decorators. I have simplified rebinner2 by using random arrays instead of the actual slice-transpose-multiply operation (to check if that caused issues for numba but it did not).

UPDATE 2:

Modified numba code to work but there is no performance gain.

As suggested by @JohnZwinck I changed njit to jit. This disables nopython mode. However, when checking time required for execution it seems the numba method now performs worse (probably because disabling nopython loses optimization).

With Numba:

(5000, 32, 32)
real:   1m11.538s
user:   0m49.137s
sys :   0m22.872s

Without Numba:

(5000, 32, 32)
real:   1m2.439s
user:   0m41.721s
sys :   0m21.152s

Upvotes: 0

Views: 325

Answers (1)

John Zwinck
John Zwinck

Reputation: 249093

Just remove the unnecessary tqdm (progress bar) and add Numba to speed it up like this (untested, but should be close to what you need):

import numba

@numba.njit
def rebinner2(arr, new_shape):
    shape = (arr.shape[0], new_shape[0], arr.shape[1] // new_shape[0],
             new_shape[1], arr.shape[2] // new_shape[1])
    return arr.reshape(shape).mean(-1).mean(2)

@numba.njit
def rebinner1(arr, window):
    return [rebinner2(arr[j:j + w, :, None]*arr[j:j + w, None, :], [32, 32]) for j in range(0, arr.shape[0], window)]

def rebinner(arr, window):
    return np.concatenate(rebinner1, axis=0)

I wrapped np.concatenate() in a non-Numba function because I'm not sure if Numba supports the axis argument.

Using multiple cores for this is probably pointless unless your data are truly gigantic, in which case you should simply load a subset of the data in each process. But Numba will make the loop far faster than multiple cores.

Upvotes: 1

Related Questions