Reputation: 1809
Suppose that I have a NumPy array of integers.
arr = np.random.randint(0, 1000, 1000)
And I have two arrays lower
and upper
, which represent lower and upper bounds respectively on slices of arr
. These intervals are overlapping and variable-length, however lowers
and uppers
are both guaranteed to be non-decreasing.
lowers = np.array([0, 5, 132, 358, 566, 822])
uppers = np.array([45, 93, 189, 533, 800, 923])
I want to find the min and the max of each slice of arr
defined by lowers
and uppers
, and store these in another array.
out_arr = np.empty((lowers.size, 2))
What is the most efficient way to do this? I'm worried there is not a vectorized approach, as I can't see how I get around indexing in a loop..
My current approach is just the straightforward
for i in range(lowers.size):
arr_v = arr[lowers[i]:uppers[i]]
out_arr[i,0] = np.amin(arr_v)
out_arr[i,1] = np.amax(arr_v)
which leaves me with a desired result like
In [304]: out_arr
Out[304]:
array([[ 26., 908.],
[ 18., 993.],
[ 0., 968.],
[ 3., 999.],
[ 1., 998.],
[ 0., 994.]])
but this is far too slow on my actual data.
Upvotes: 6
Views: 350
Reputation: 53089
Ok, here is how to at least down-size the original problem using np.minimum.reduceat
:
lu = np.r_[lowers, uppers]
so = np.argsort(lu)
iso = np.empty_like(so)
iso[so] = np.arange(len(so))
cut = len(lowers)
lmin = np.minimum.reduceat(arr, lu[so])
for i in range(cut):
print(min(lmin[iso[i]:iso[cut+i]]), min(arr[lowers[i]:uppers[i]]))
# 33 33
# 7 7
# 5 5
# 0 0
# 3 3
# 7 7
What this doesn't achieve is getting rid of the main loop but at least the data were reduced from a 1000 element array to a 12 element one.
Update:
With small overlaps @Eric Hansen's own solutions are hard to beat. I'd still like to point out that if there are substantial overlaps then it may even be worthwhile to combine both methods. I don't have numba
, so below is just a twopass version that combines my preprossing with Eric's pure numpy
solution which also serves as a benchmark in the form of onepass
:
import numpy as np
from timeit import timeit
def twopass(lowers, uppers, arr):
lu = np.r_[lowers, uppers]
so = np.argsort(lu)
iso = np.empty_like(so)
iso[so] = np.arange(len(so))
cut = len(lowers)
lmin = np.minimum.reduceat(arr, lu[so])
return np.minimum.reduceat(lmin, iso.reshape(2,-1).T.ravel())[::2]
def onepass(lowers, uppers, arr):
mixture = np.empty((lowers.size*2,), dtype=lowers.dtype)
mixture[::2] = lowers; mixture[1::2] = uppers
return np.minimum.reduceat(arr, mixture)[::2]
arr = np.random.randint(0, 1000, 1000)
lowers = np.array([0, 5, 132, 358, 566, 822])
uppers = np.array([45, 93, 189, 533, 800, 923])
print('small')
for f in twopass, onepass:
print('{:18s} {:9.6f} ms'.format(f.__name__,
timeit(lambda: f(lowers, uppers, arr),
number=10)*100))
arr = np.random.randint(0, 1000, 10**6)
lowers = np.random.randint(0, 8*10**5, 10**4)
uppers = np.random.randint(2*10**5, 10**6, 10**4)
swap = lowers > uppers
lowers[swap], uppers[swap] = uppers[swap], lowers[swap]
print('large')
for f in twopass, onepass:
print('{:18s} {:10.4f} ms'.format(f.__name__,
timeit(lambda: f(lowers, uppers, arr),
number=10)*100))
Sample run:
small
twopass 0.030880 ms
onepass 0.005723 ms
large
twopass 74.4962 ms
onepass 3153.1575 ms
Upvotes: 3
Reputation: 1809
An improved version of my original attempt I came up with based on Paul Panzer's suggestion of reduceat
is
mixture = np.empty((lowers.size*2,), dtype=lowers.dtype)
mixture[::2] = lowers; mixture[1::2] = uppers
np.column_stack((np.minimum.reduceat(arr, mixture)[::2],
np.maximum.reduceat(arr, mixture)[::2]))
On a sample size comparable to my actual data, this runs in 4.22 ms on my machine compared to my original solution taking 73 ms.
Even faster though is to just use Numba with my original solution
from numba import jit
@jit
def get_res():
out_arr = np.empty((lowers.size, 2))
for i in range(lowers.size):
arr_v = arr[lowers[i]:uppers[i]]
out_arr[i,0] = np.amin(arr_v)
out_arr[i,1] = np.amax(arr_v)
return out_arr
which runs in 100 microseconds on my machine.
Upvotes: 1
Reputation: 441
The execution will be slow since inside the loop, the sub array is copied to an array and then the operations are carried out. You can avoid the entire loop by single line code
out_array = np.array([(np.amin(arr[lowers[i]:uppers[i]]),np.amax(arr[lowers[i]:uppers[i]])) for i in range(lowers.size)])
Upvotes: -1