matttree
matttree

Reputation: 135

Improving runtime of python numpy code

I have a code which reassigns bins to a large numpy array. Basically, the elements of the large array has been sampled at different frequency and the final goal is to rebin the entire array at fixed bins freq_bins. The code is kind of slow for the array I have. Is there any good way to improve the runtime of this code? A factor of few would do for now. May be some numba magic would do.

import numpy as np
import time
division = 90
freq_division = 50
cd = 3000
boost_factor = np.random.rand(division, division, cd)
freq_bins = np.linspace(1, 60, freq_division)
es = np.random.randint(1,10, size = (cd, freq_division))
final_emit = np.zeros((division, division, freq_division))
time1 = time.time()
for i in xrange(division):
    fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
    sky_by_cap = np.einsum('ij, jk->ijk', boost_factor[i],es)
    freq_index = np.digitize(fre_boost, freq_bins)
    freq_index_reshaped = freq_index.reshape(division*cd, -1)
    freq_index = None
    sky_by_cap_reshaped = sky_by_cap.reshape(freq_index_reshaped.shape)
    to_bin_emit = np.zeros(freq_index_reshaped.shape)
    row_index = np.arange(freq_index_reshaped.shape[0]).reshape(-1, 1)
    np.add.at(to_bin_emit, (row_index, freq_index_reshaped), sky_by_cap_reshaped)
    to_bin_emit = to_bin_emit.reshape(fre_boost.shape)
    to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
    final_emit[i] = np.sum(to_bin_emit, axis=1)
print(time.time()-time1)

Upvotes: 2

Views: 657

Answers (3)

max9111
max9111

Reputation: 6482

Keep the code simple and than optimize

If you have an idea what algorithm you want to code write a simple reference implementation. From this you can go two ways using Python. You can try to vectorize the code or you can compile the code to get good performance.

Even if np.einsum or np.add.at were implementet in Numba, it would be very hard for any compiler to make efficient binary code from your example.

The only thing I have rewritten is a more efficient approach of digitize for scalar values.

Edit

In the Numba source code there is a more efficient implimentation of digitize for scalar values.

Code

#From Numba source
#Copyright (c) 2012, Anaconda, Inc.
#All rights reserved.

@nb.njit(fastmath=True)
def digitize(x, bins, right=False):
    # bins are monotonically-increasing
    n = len(bins)
    lo = 0
    hi = n

    if right:
        if np.isnan(x):
            # Find the first nan (i.e. the last from the end of bins,
            # since there shouldn't be many of them in practice)
            for i in range(n, 0, -1):
                if not np.isnan(bins[i - 1]):
                    return i
            return 0
        while hi > lo:
            mid = (lo + hi) >> 1
            if bins[mid] < x:
                # mid is too low => narrow to upper bins
                lo = mid + 1
            else:
                # mid is too high, or is a NaN => narrow to lower bins
                hi = mid
    else:
        if np.isnan(x):
            # NaNs end up in the last bin
            return n
        while hi > lo:
            mid = (lo + hi) >> 1
            if bins[mid] <= x:
                # mid is too low => narrow to upper bins
                lo = mid + 1
            else:
                # mid is too high, or is a NaN => narrow to lower bins
                hi = mid

    return lo

@nb.njit(fastmath=True)
def digitize(value, bins):
  if value<bins[0]:
    return 0

  if value>=bins[bins.shape[0]-1]:
    return bins.shape[0]

  for l in range(1,bins.shape[0]):
    if value>=bins[l-1] and value<bins[l]:
      return l

@nb.njit(fastmath=True,parallel=True)
def inner_loop(boost_factor,freq_bins,es):
  res=np.zeros((boost_factor.shape[0],freq_bins.shape[0]),dtype=np.float64)
  for i in nb.prange(boost_factor.shape[0]):
    for j in range(boost_factor.shape[1]):
      for k in range(freq_bins.shape[0]):
        ind=nb.int64(digitize(boost_factor[i,j]*freq_bins[k],freq_bins))
        res[i,ind]+=boost_factor[i,j]*es[j,k]*freq_bins[ind]
  return res

@nb.njit(fastmath=True)
def calc_nb(division,freq_division,cd,boost_factor,freq_bins,es):
  final_emit = np.empty((division, division, freq_division),np.float64)
  for i in range(division):
    final_emit[i,:,:]=inner_loop(boost_factor[i],freq_bins,es)
  return final_emit

Performance

(Quadcore i7)
original_code: 118.5s
calc_nb: 4.14s
#with digitize implementation from Numba source
calc_nb: 2.66s

Upvotes: 1

abarnert
abarnert

Reputation: 365707

This seems to be trivially parallelizable:

  • You've got an outer loop that you run 90 times.
  • Each time, you're not mutating any shared arrays except final_emit
    • … and that, only to store into a unique row.
  • It looks like most of the work inside the loop is numpy array-wide operations, which will release the GIL.

So (using the futures backport of concurrent.futures, since you seem to be on 2.7):

import numpy as np
import time
import futures

division = 90
freq_division = 50
cd = 3000
boost_factor = np.random.rand(division, division, cd)
freq_bins = np.linspace(1, 60, freq_division)
es = np.random.randint(1,10, size = (cd, freq_division))
final_emit = np.zeros((division, division, freq_division))

def dostuff(i):
    fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
    # ...
    to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
    return np.sum(to_bin_emit, axis=1)

with futures.ThreadPoolExecutor(max_workers=8) as x:
    for i, row in enumerate(x.map(dostuff, xrange(division))):
        final_emit[i] = row

If this works, there are two tweaks to try, either of which might be more efficient. We don't really care which order the results come back in, but map queues them up in order. This can waste a bit of space and time. I don't think it will make much difference (presumably, the vast majority of your time is presumably spent doing the calculations, not writing out the results), but without profiling your code, it's hard to be sure. So, there are two easy ways around this problem.


Using as_completed lets us use the results in whatever order they finish, rather than in the order we queued them. Something like this:

def dostuff(i):
    fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
    # ...
    to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
    return i, np.sum(to_bin_emit, axis=1)

with futures.ThreadPoolExecutor(max_workers=8) as x:
    fs = [x.submit(dostuff, i) for i in xrange(division))
    for i, row in futures.as_completed(fs): 
        final_emit[i] = row

Alternatively, we can make the function insert the rows directly, instead of returning them. This means we're now mutating a shared object from multiple threads. So I think we need a lock here, although I'm not positive (numpy's rules are a bit complicated, and I haven't read you code that thoroughly…). But that probably won't hurt performance significantly, and it's easy. So:

import numpy as np
import threading
# etc.
final_emit = np.zeros((division, division, freq_division))
final_emit_lock = threading.Lock()

def dostuff(i):
    fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
    # ...
    to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
    with final_emit_lock:
        final_emit[i] = np.sum(to_bin_emit, axis=1)

with futures.ThreadPoolExecutor(max_workers=8) as x:
    x.map(dostuff, xrange(division))

That max_workers=8 in all of my examples should be tuned for your machine. Too many threads is bad, because they start fighting each other instead of parallelizing; too few threads is even worse, because some of your cores just sit there idle.

If you want this to run on a variety of machines, rather than tuning it for each one, the best guess (for 2.7) is usually:

import multiprocessing
# ...
with futures.ThreadPoolExecutor(max_workers=multiprocessing.cpu_count()) as x:

But if you want to squeeze the max performance out of a specific machine, you should test different values. In particular, for a typical quad-core laptop with hyperthreading, the ideal value can be anywhere from 4 to 8, depending on the exact work you're doing, and it's easier to just try all the values than to try to predict.

Upvotes: 1

konstant
konstant

Reputation: 705

I think you get a small boost in the performance by replacing einsum with actual multiplication.

import numpy as np
import time
division = 90
freq_division = 50
cd = 3000
boost_factor = np.random.rand(division, division, cd)
freq_bins = np.linspace(1, 60, freq_division)
es = np.random.randint(1,10, size = (cd, freq_division))
final_emit = np.zeros((division, division, freq_division))
time1 = time.time()
for i in xrange(division):
    fre_boost = boost_factor[i][:, :, None]*freq_bins[None, None, :]
    sky_by_cap = boost_factor[i][:, :, None]*es[None, :, :]
    freq_index = np.digitize(fre_boost, freq_bins)
    freq_index_reshaped = freq_index.reshape(division*cd, -1)
    freq_index = None
    sky_by_cap_reshaped = sky_by_cap.reshape(freq_index_reshaped.shape)
    to_bin_emit = np.zeros(freq_index_reshaped.shape)
    row_index = np.arange(freq_index_reshaped.shape[0]).reshape(-1, 1)
    np.add.at(to_bin_emit, (row_index, freq_index_reshaped), sky_by_cap_reshaped)
    to_bin_emit = to_bin_emit.reshape(fre_boost.shape)
    to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
    final_emit[i] = np.sum(to_bin_emit, axis=1)
print(time.time()-time1)

Your code is rather slow at np.add.at, which I believe can be much faster with np.bincount, although I couldn't get it quite work for multidimensional arrays you have. May be someone here can add to that.

Upvotes: 0

Related Questions