Max Linke
Max Linke

Reputation: 1735

Parallelize loop over numpy rows

I need to apply the same function onto every row in a numpy array and store the result again in a numpy array.

# states will contain results of function applied to a row in array
states = np.empty_like(array)

for i, ar in enumerate(array):
    states[i] = function(ar, *args)

# do some other stuff on states

function does some non trivial filtering of my data and returns an array when the conditions are True and when they are False. function can either be pure python or cython compiled. The filtering operations on the rows are complicated and can depend on previous values in the row, this means I can't operate on the whole array in an element-by-element fashion

Is there a way to do something like this in dask for example?

Upvotes: 5

Views: 3618

Answers (2)

MRocklin
MRocklin

Reputation: 57251

Dask solution

You could do with with dask.array by chunking the array by row, calling map_blocks, then computing the result

ar = ...
x = da.from_array(ar, chunks=(1, arr.shape[1]))
x.map_blocks(function, *args)
states = x.compute()

By default this will use threads, you can use processes in the following way

from dask.multiprocessing import get
states = x.compute(get=get)

Pool solution

However dask is probably overkill for embarrassingly parallel computations like this, you could get by with a threadpool

from multiprocessing.pool import ThreadPool
pool = ThreadPool()

ar = ...
states = np.empty_like(array)

def f(i):
    states[i] = function(ar[i], *args)

pool.map(f, range(len(ar)))

And you could switch to processes with the following change

from multiprocessing import Pool
pool = Pool()

Upvotes: 7

Neil G
Neil G

Reputation: 33202

Turn your function into a universal function: http://docs.scipy.org/doc/numpy/reference/ufuncs.html.

Then: states = function(array, *args).

Upvotes: 0

Related Questions