Reputation: 203
Please assume the following NumPy array:
A = array([1, 1, 0, 1, 0, 0, 0, 0, 0, 0])
I would like to find the indices of this array that N
consecutive values are equal to zero (inclusive).
For example, assume N=3
. We know that A[2]=0
while A[3]>0
. Thus, the second element of array A
is not with three consecutive zero value (inclusive). A desirable outcome for array A would look like the below:
B = array([False, False, False, False, True, True, True, True, False, False])
I can write a loop as the answer to this question:
N = 3
A = np.array([1, 1, 0, 0, 0, 0, 0, 0, 0, 0])
B = np.zeros(len(A), dtype=bool)
for i in range(len(A)):
if (i + N <= len(A)) and (sum(A[i:i + N]) == 0):
B[i] = True
Since matrix A could be much larger and I need to do the above search process millions of times, I need the fastest possible way. What would be your suggestion?
Upvotes: 5
Views: 625
Reputation: 26886
This is a significant improvement over the original approach as long as the block size N
(below referred to as m
or size
) is large enough, and the input has a significant number of non-zero values:
import numpy as np
import numba as nb
@nb.njit
def find_blocks_while2_nb(arr, size, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
i = j = 0
while i < n - size + 1:
# last iteration found a block, only checking one value
if j < i and arr[i + size - 1] == value:
result[i] = True
i += 1
else:
# search backward for index of last non-block value
j = i + size - 1
while j >= i and arr[j] == value:
j -= 1
if j < i:
# block found, advance by 1
result[i] = True
i += 1
else:
# block not found, advance by last non-block value occurrence
j += 1
i = j
return result
Essentially it is a Numba accelerated version of your original approach with a number of micro-optimizations:
For small block sizes, more micro-optimized approach may be faster as introduced in other answers, and briefly discussed below.
The problem of finding the indices where a block of identical values starts can be seen as a specialization of a classical problem in computer science: finding a sub-array within an array (discussed in some detail here).
In this case, the sub-string (i.e. the block) consists of a series of identical values, more precisely 0
s.
The complexity of this problem depends, in general, on the size of the array (n
) and the size of the block (m
or size
), as well as the actual values within them (obviously).
Of course, it is assumed that n > m
, as it is clear that if n == m
the solution is trivial, and when n < m
the array cannot fit the block.
For the same reason, no index above n - m
can be found.
While a naïve implementation would require essentially two nested loops, one for string and one for sub-string (which would result is O(nm)
computational complexity), it is possible to make use of the fact that the block consists of identical values to run the algorithm in guaranteed linear time O(n)
and with constant memory requirement O(1)
.
However, under the assumption that the input array has a mixed distribution of both block and non-block values, it is possible to run in approx. O(n / (m - ε))
time and still with O(1)
memory consumption. The ε < m - 1
is a parameter proportional to the probability of finding a non-block value.
The idea is that when trying to determine if a block starts at position i
if the array has a non-block value at position j
with j - i < m
then the next position to evaluate is j + 1
, since, the block cannot fit between i
and j
.
Moreover, if the search starts from the back (i.e. at position i - m
), then the number of position that are not checked is maximal for sufficiently heterogeneous inputs, as all j - i
values do not need to be checked.
For similar reasons, if it is known that a block starts at position i
, to check whether a block starts at position i + j
with j < m
, only the values between i + m
and i + m + j
needs to checked, and the values between i
and i + m - 1
do not need to checked again.
This a manual case where a generator is a perfect fit, because it is possible to leverage its lazy evaluation properties to perform the bit of computation only as needed. Additionally, it is not known in advance how large the result will be (i.e. it is possible to find many blocks or no blocks at all).
However, if this is part of a larger computation it may be faster to pre-allocate the size of the result and just modify the value its indices.
There should not be a need to allocate more than n - m + 1
values.
However, for compatibility with the requirements of the question, an array the size of the input is pre-allocated.
If needed, it is easy to adapt the following implementations to avoid wasting this extra memory.
More precisely, the solutions based on explicit looping can be converted to generators by replacing result[i] = True
with yield i
(or similar).
Likewise, solutions that compute the boolean array natively can be converted to generators with something like:
for i in np.nonzero(result)[0]:
yield i
The remainder of the analysis will assume that the target result is the boolean array.
One of the challenges of implementing this in pure Python is that explicit looping is relatively slow. For this reason, it is convenient to use some acceleration technique (e.g. Numba or Cython) to reduce computation time.
Alternatively, given that the algorithm consists essentially of two loops (one for the array and one for the sub-array) one may seek to vectorize one or both loops.
However, do note that these approaches would necessarily run in O(nm)
worst-case time.
The basic naïve approach is the following:
def find_blocks_for2(arr, size, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
for i in range(n - size + 1):
all_equal = True
for j in range(i, i + size):
if arr[j] != value:
all_equal = False
break # vectorized approaches may not have this
if all_equal:
result[i] = True
return result
This can be improved in a number of directions. One way stems from the fact that the inner loop is actually useless, because it is sufficient to keep track of the number of consecutive "good" values present (this is essentially the same as MichaelSzczesny's answer):
import numpy as np
def find_blocks_for(arr, size, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
block_size = 0
for i in range(n):
if arr[i] == value:
block_size += 1
else:
block_size = 0
if block_size >= size:
result[i - size + 1] = True
return result
Note that this can be seen as a specialization of the Rabin-Karp algorithm where the role of the hashing is played by the block_size
counter.
Another possible approach is to skip the second loop only when the last iteration already identified the presence of a block and have it running backward to maximize the jump to the next possible block:
import numpy as np
def find_blocks_while2(arr, size, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
i = j = 0
while i < n - size + 1:
# last iteration found a block, only checking one value
if j < i and arr[i + size - 1] == value:
result[i] = True
i += 1
else:
# search backward for index of last non-block value
j = i + size - 1
while j >= i and arr[j] == value:
j -= 1
if j < i:
# block found, advance by 1
result[i] = True
i += 1
else:
# block not found, advance by last non-block value occurrence
j += 1
i = j
return result
Note that this can be seen as a specialization of the Knuth-Morris-Pratt algorithm without the need of computing offsets for each value of the block (they are all the same).
A number of variations can be devised for the above approaches, all with the same asymptotic behavior but slightly different running time depending on the actual input. For example, one could change the order of the conditions to speed-up the case where many consecutive blocks are found, at the expenses of running more instructions elsewhere, or the other way around.
All these run just too slow without acceleration either with Numba, Cython or Pypy. Pypy uses a different interpreter and I will not discuss/benchmark it further.
Numba provides one of the most straightforward and effective acceleration (just apply a decorator), e.g.:
import numba as nb
find_blocks_while2_nb = nb.njit(find_blocks_while2)
find_blocks_while2_nb.__name__ = "find_blocks_while2_nb"
Cython can also be used, e.g. (using %load_ext Cython
Jupyter magic), but to take full advantage of the compilation it would require extra care to make sure bottleneck code runs without Python interaction:
%%cython -c-O3 -c-march=native -a
#cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True, infer_types=True
import cython as cy
import numpy as np
cdef _find_blocks_lin_cy(long[::1] arr, char[::1] result, Py_ssize_t n, Py_ssize_t size, long value):
cdef Py_ssize_t block_size = 0
for i in range(n):
if arr[i] == value:
block_size += 1
else:
block_size = 0
if block_size >= size:
result[i - size + 1] = True
def find_blocks_lin_cy(arr, size, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
_find_blocks_lin_cy(arr, result, n, size, value)
return result
Whether it is going to be faster Numba or Cython depends on how much optimization can be triggered by the code, and as such depends also on which combination of compiler and CPU is used.
Alternatively, either the inner or the outer loop (or both) can be vectorized. Of course this is most effective on whether it is run more the inner or the outer loop. The largest loop should be run vectorized for maximum speed-up.
Vectorizing the inner loop would lead to something close to this (similar to the original post, but with smaller boundaries and sum()
replaced with np.all()
which also sports short-circuiting):
def find_blocks_for_all(arr, size, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
for i in range(n - size + 1):
if np.all(arr[i:i + size] == value):
result[i] = True
return result
Vectorizing the outer loop would lead to something close to this (similar to what is presented in DanielF's answer, but with overall cleaner code, especially avoiding unnecessary function calls):
def find_blocks_for_and(arr, size, value):
result = (arr == value)
for i in range(1, size):
result[:-1] &= result[1:]
result[1 - size:] = False
return result
Vectorizing both loops would lead to something close to this (similar to one approach from NaphatAmundsen's answer):
def find_blocks_strides(arr, size, value):
n = arr.size
strides = (arr.itemsize,) * 2
block = np.full(size, value, dtype=arr.dtype)
# note that `as_strided()` does not allocate extra memory
# the downside is that buffer overflow will occur and
# extra care must be taken not to use data outside the boundaries
check = np.lib.stride_tricks.as_strided(arr, (n, size), strides)
result = np.all(check == block, axis=-1)
result[1 - size:] = False
return result
The above code can be further specialized under the assumption that the input will contain only positive values and that block consists of zeros only.
This would mean replacing np.all()
calls with np.sum()
or similar.
One approach where this is actually beneficial is with the fully vectorized approach, where as_strided()
can be replaced by computing the correlation with a non-zero block (similar to one approach from NaphatAmundsen's answer):
def find_blocks_0_conv(arr, size):
n = arr.size
result = np.zeros_like(arr, dtype=np.bool_)
block = np.ones(size, dtype=arr.dtype)
result[:1 - size] = (np.correlate(arr, block, 'valid') == 0)
return result
Aside of the looping, the running time will also depend on how the code translates to hardware instructions and ultimately how fast are them. For example, if NumPy is compiled with full support of SIMD instructions, it may very well be that a vectorized (but not theoretically optimal) approach would outperform, for many input combinations, theoretically optimal approaches that are naïvely accelerated. The same could happen for approaches that are written in a way that acceleration techniques can make very good use of SIMD instructions. However, the extra speed they provide will be limited to specific systems (i.e. specific combinations of compilers and processors).
One such example is the following (essentially a polishing of Jérôme Richard's answer to avoid global variables and removing unnecessary looping):
def find_blocks_simd_nb_cached(arr, size, value=0, cache={}):
assert size > 0
if size not in cache:
print('cached: ', list(cache.keys()))
def find_blocks_simd_nb(arr, value):
n = arr.size
result = np.zeros(n, dtype=np.bool_)
check = (arr == value)
for i in range(n - size + 1):
# using a `temp` variable avoid unnecessarily accessing the `check` array
temp = check[i]
for j in range(1, size):
temp &= check[i + j]
result[i] = temp
return result
cache[size] = nb.njit(find_blocks_simd_nb)
return cache[size](arr, value)
Here, some benchmarks are reported. As always, they should be taken with a grain of salt.
Only the fastest approaches are considered, for varying input sizes, block sizes and values.
The _nb
and _cy
(except for find_blocks_for_cy()
are essentially non-optimized versions of the same functions without the _nb
or _cy
suffix from above.
Note that:
find_blocks_for_all()
is actually fast for sufficiently large values of size
find_blocks_for_and()
is very fast for smaller values of size
and does not require Numba or Cythonfind_blocks_for_nb()
is essentially independent on the block size and performs overall really wellfind_blocks_simd_nb()
is very fast for small values of size
but performs comparatively poorly for larger valuesfind_blocks_while2_nb()
really shines for larger values of size
and it is respectably fast for smaller values(Full benchmarks available here).
Upvotes: 6
Reputation: 2041
This one is numpy only and O(len(A)) similar to the answer from Ali_Sh but shorter and quicker. It does require some awkward handling of the edges of the array. The choice for a smaller dtype for the cumsum is a micro-optimization that I'm not sure is worth it..
def numpy_linear(A, N):
dtype = np.min_scalar_type(N)
assert issubclass(dtype.type, np.unsignedinteger)
# The cumsum can overflow for longer arrays
# but the difference calculation will still be correct
c = np.cumsum(A == 0, dtype=dtype)
B = np.r_[c[N-1] == N, c[N:]-c[:-N] == N, np.zeros(N-1, bool)]
return B
Upvotes: 1
Reputation: 2816
Finding consecutive values and related indices is one of subjects that have solution in other SO question. In this regard we can find range indices for repeated zeros and their numbers:
def zero_runs(a):
iszero = np.concatenate(([0], np.equal(a, 0).view(np.int8), [0]))
absdiff = np.abs(np.diff(iszero))
ranges = np.where(absdiff == 1)[0].reshape(-1, 2)
return ranges
# [[ 2 3]
# [ 4 10]]
When we found the indices ranges, we can fill the 1D array by using
def filled_array(start, end, length):
out = np.zeros((length), dtype=int)
np.add.at(out, start, 1)
np.add.at(out, end, -1)
return out.cumsum() > 0
def filled_array_v2(start, end, length):
out =np.bincount(start, minlength=length) - np.bincount(end, minlength=length)
return out.cumsum().astype(bool)
So, by using the aforementioned functions and defining a master function as below:
def final(A, N):
runs = zero_runs(A)
repeats_cound = runs[:, 1] - runs[:, 0]
# [1 6]
suspected_ranges = runs[repeats_cound >= 3]
# [[ 4 10]]
start = suspected_ranges[:, 0]
end = start + N + 1
return filled_array(start, end, length=A.size) # filled_array_v2(start, end, length=A.size)
# [False False False False True True True True False False]
In one of my benchmarks on data array volume 10000
, this code ran faster than the proposed numba method by Naphat.
By using any
(instead sum
) as @Mercury's loop optimization mentioned, we can improve Naphat's baseline_numba by parallelizing it usingnb.prange
along with parallel=True
in the decorator line.
By A size: 20000 on Colab:
# N ----> 3:
1000 loops, best of 5: 105 µs per loop Norok
--------------------
100 loops, best of 5: 35 ms per loop Nephat baseline
100 loops, best of 5: 730 µs per loop Nephat baseline_numba
1000 loops, best of 5: 225 µs per loop Nephat correlational
1000 loops, best of 5: 397 µs per loop Nephat stridetricks
--------------------
1000 loops, best of 5: 42.3 µs per loop Michael Szczesny (The fastest)
--------------------
1000 loops, best of 5: 352 µs per loop This answer
1000 loops, best of 5: 279 µs per loop Nephat baseline_numba PARALLELIZED
=================================================================
# N ----> 10:
1000 loops, best of 5: 25.3 µs per loop Norok (The fastest)
--------------------
100 loops, best of 5: 46.8 ms per loop Nephat baseline
100 loops, best of 5: 794 µs per loop Nephat baseline_numba
1000 loops, best of 5: 318 µs per loop Nephat correlational
1000 loops, best of 5: 433 µs per loop Nephat stridetricks
--------------------
1000 loops, best of 5: 31.9 µs per loop Michael Szczesny
--------------------
1000 loops, best of 5: 341 µs per loop This answer
1000 loops, best of 5: 291 µs per loop Nephat baseline_numba PARALLELIZED
=================================================================
# N ----> 20
1000 loops, best of 5: 6.52 µs per loop Norok (The fastest)
--------------------
1000 loops, best of 5: 31.5 µs per loop Michael Szczesny
--------------------
1000 loops, best of 5: 350 µs per loop This answer
1000 loops, best of 5: 292 µs per loop Nephat baseline_numba PARALLELIZED
=================================================================
# N ----> 30
1000 loops, best of 5: 4.01 µs per loop Norok (The fastest)
--------------------
1000 loops, best of 5: 31.8 µs per loop Michael Szczesny
Latest benchmarks:
The following results show the best performances, respectively, based on my last test on the prepared colab link (the last two cells):
N <10
---> Richard, (==
) Daniel10 < N <20
---> Richard, Michael, (==
) Norok20 < N <60
---> Norok, Richard, MichaelN >60
---> Norok, Michael, RichardUpvotes: 2
Reputation: 50298
For small values of N
(not changing much at runtime), the best solution is to use a SIMD-friendly Numba implementation that is specialized for each possible specific N
value. The compiler can generate a very efficient code when N <= ~30
. For larger N
values, the solution of @norok2 is start to be better since it can skip many items of the array as long as there are not too many 0 items. When the number of 0 is big, this function is still competitive as long as N <= 64
. When N > 64
, please use a better suited implementation.
# Cache dictionary meant to store function for a given N
cache = {}
def compute(A, N):
assert N > 0
if N not in cache:
# Numba function to be compiled for a specific N
def compute_specific(A):
out = np.zeros(A.size, dtype=np.bool_)
isZero = (A == 0).view(np.uint8)
for i in range(isZero.size-N+1):
allSame = isZero[i]
for j in range(1, N):
allSame &= isZero[i+j]
out[i] = allSame
for i in range(A.size-N+1, A.size):
out[i] = 0
return out
cache[N] = nb.njit(compute_specific)
return cache[N](A)
The compiler can auto-vectorize this code with SIMD instruction (like AVX-2) so that 32~64 items are computed per cycle on modern x86 processors. Larger N
values require more SIMD instructions causing the function to be less efficient. In comparison, implementations using conditional branches tends to be slow since a miss-predicted branch generally cost about 10~15 cycles on mainstream x86 processors and each iteration can only compute one scalar item at a time (ie. ~2 order of magnitude slower assuming the amount of work would be the same).
Here are results on my machine (Intel Xeon Skylake) with a random (32-bit) integer arrays of size 100_000. There is about 50% of 0 values in the array. The Numba compilation time is excluded in all implementations.
With N = 3:
- Initial solution: 115_000 µs
- Ali_Sh: 1_575 µs
- Naphat (stridetricks): 929 µs
- Naphat (correlational): 528 µs
- Michael Szczesny: 350 µs
- Norok2: 306 µs
- Franciska: 295 µs
- This code: 11 us <---
With N = 10:
- Initial solution: 149_000 µs
- Ali_Sh: 1_592 µs
- Naphat (stridetricks): 1_139 µs
- Naphat (correlational): 831 µs
- Michael Szczesny: 489 µs
- Franciska: 444 µs
- Norok2: 98 µs
- This code: 14 µs <---
With N = 30:
- Ali_Sh: [FAIL]
- Initial solution: 255_000 µs
- Franciska: 2_285 µs
- Naphat (correlational): 1_936 µs
- Naphat (stridetricks): 1_628 µs
- Michael Szczesny: 647 µs
- Norok2: 30 µs
- This code: 25 µs <---
With N = 60:
- Ali_Sh: [FAIL]
- Initial solution: 414_000 µs
- Naphat (correlational): 3_814 µs
- Franciska: 3_242 µs
- Naphat (stridetricks): 3_048 µs
- Michael Szczesny: 816 µs
- This code: 50 µs <---
- Norok2: 14 µs
We can clearly see that this function is the fastest one (by a large margin) except when N is big (in this case, the Norok2's code is faster).
Upvotes: 6
Reputation: 14399
Should be possible with only bitwise math. Not sure about speed, probably doesn't beat numba
, but should top anything with a for
loop otherwise.
def find_blocks(A, N):
reduce = lambda x: np.logical_and(x[:-1], x[1:], out = x[:-1])
x = np.logical_not(A)
for i in range(N-1):
reduce(x)
x[-N+1:] = False
return x
find_blocks(A, 3)
Out[]:
array([False, False, False, False, True, True, True, True, False,
False])
Upvotes: 3
Reputation: 505
You can use a convolution filter, to see the consecutive zeros. And as the "leftover" zeros at the end can't be True, if there isn't an N length sequence of them, you can always complete the result with Falses.
import numpy as np
N = 3
A = np.array([1, 1, 0, 1, 0, 0, 0, 0, 0, 0])
conv=np.convolve(np.ones((N)),A,mode='valid')
B=np.where(conv==0,True,False)
B=np.append(B,np.zeros((N-len(A)%N),dtype=bool))
the result is:
[False False False False True True True True False False]
Upvotes: 3
Reputation: 5036
A solution with linear time complexity, compiled with numba
.
The idea is to look at each element of A once and count occurrences of zeros in counter c. If a non-zero element is encountered, c is reset to 0. As long as c >= N
the result array is set to True
at the appropriate index.
import numba as nb # tested with numba 0.55.1
@nb.jit
def numba_linear(A, N):
B = np.zeros(len(A), dtype=np.bool_)
c = 0
for i in range(len(A)):
c = c+1 if A[i] == 0 else 0
if c >= N: B[i - N + 1] = 1
return B
A = np.array([1, 1, 0, 1, 0, 0, 0, 0, 0, 0])
numba_linear(A, 3)
Output
array([False, False, False, False, True, True, True, True, False,
False])
Benchmark against @Nephat baseline_numba
with @Mercury's loop optimization applied (sum
was faster than any
in my benchmarks).
import numpy as np
@nb.njit
def baseline_numba(A: np.ndarray, N: int):
'''
You may need to run this once first to see performance gains
as this has to be JIT compiled by Numba
'''
B = np.zeros(len(A), dtype=np.int8)
for i in nb.prange(len(A)-N+1):
if A[i:i + N].sum() == 0:
B[i] = 1
return B
rng = np.random.default_rng()
A = rng.integers(2, size=20000)
N = 3
%timeit baseline_numba(A, N)
#10000 loops, best of 5: 120 µs per loop
%timeit numba_linear(A, N)
#10000 loops, best of 5: 29.4 µs per loop
np.testing.assert_array_equal(numba_linear(A,N), baseline_numba(A,N))
Upvotes: 4
Reputation: 1623
I have implemented a few other alternatives that you could try:
numba
to JIT compile the loopy codenp.correlate
to do a signal-processing type of correlation, then use nonzero
to obtain indicesstride_tricks
to do the correlation insteadI would like to benchmark the solutions, but it is getting a bit late here. I can maybe do it after work tomorrow if you'd like.
I also use dtype=np.int8
for B
since it is easier to visually confirm that the output is correct (see output).
import numpy as np
import numba
from numpy.lib.stride_tricks import as_strided
A = np.array([1, 1, 0, 1, 0, 0, 0, 0, 0, 0])
N = 3
def baseline(A: np.ndarray):
'''
Your solution
'''
B = np.zeros(len(A), dtype=np.int8)
for i in range(len(A)):
if (i + N <= len(A)) and (sum(A[i:i + N]) == 0):
B[i] = 1
return B
@numba.njit()
def baseline_numba(A: np.ndarray, N: int):
'''
You may need to run this once first to see performance gains
as this has to be JIT compiled by Numba
'''
B = np.zeros(len(A), dtype=np.int8)
for i in range(len(A)):
if (i + N <= len(A)) and (A[i:i + N].sum() == 0):
B[i] = 1
return B
def correlational(A: np.ndarray):
'''
Use a correlation operation
'''
B = np.zeros_like(A, dtype=np.int8)
B[(np.correlate(A, np.full(N, 1), 'valid') == 0).nonzero()[0]] = 1
return B
def stridetricks(A: np.ndarray):
'''
Same idea as using correlation, but with stride tricks
Maybe it will be faster? Probably not.
'''
u = np.array(A.itemsize)
B = np.zeros_like(A, dtype=np.int8)
B[(as_strided(A, (len(A) - N + 1, N), u * (1, 1)).sum(1) == 0).nonzero()[0]] = 1
return B
print(baseline(A))
print(baseline_numba(A, N))
print(correlational(A))
print(stridetricks(A))
Output:
[0 0 0 0 1 1 1 1 0 0]
[0 0 0 0 1 1 1 1 0 0]
[0 0 0 0 1 1 1 1 0 0]
[0 0 0 0 1 1 1 1 0 0]
Upvotes: 3
Reputation: 37
To Obtain one's index, you can start with the following method:
import bumpy as np
my__Truth = np.array([True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, False, True])
np.sort(my_Truth, in_place=my_Truth[0])
# ~ Obtain Index ~ #
index_values = np.where(my_Truth)
print(index_values)
# un-comment if using python2
# print index_values
Upvotes: 0