Alfe
Alfe

Reputation: 59516

Max in a sliding window in NumPy array

I want to create an array which holds all the max()es of a window moving through a given numpy array. I'm sorry if this sounds confusing. I'll give an example. Input:

[ 6,4,8,7,1,4,3,5,7,2,4,6,2,1,3,5,6,3,4,7,1,9,4,3,2 ]

My output with a window width of 5 shall be this:

[     8,8,8,7,7,7,7,7,7,6,6,6,6,6,6,7,7,9,9,9,9     ]

Each number shall be the max of a subarray of width 5 of the input array:

[ 6,4,8,7,1,4,3,5,7,2,4,6,2,1,3,5,6,3,4,7,1,9,4,3,2 ]
  \       /                 \       /
   \     /                   \     /
    \   /                     \   /
     \ /                       \ /
[     8,8,8,7,7,7,7,7,7,6,6,6,6,6,6,7,7,9,9,9,9     ]

I did not find an out-of-the-box function within numpy which would do this (but I would not be surprised if there was one; I'm not always thinking in the terms the numpy developers thought). I considered creating a shifted 2D-version of my input:

[ [ 6,4,8,7,1,4,3,5,7,8,4,6,2,1,3,5,6,3,4,7,1 ]
  [ 4,8,7,1,4,3,5,7,8,4,6,2,1,3,5,6,3,4,7,1,9 ]
  [ 8,7,1,4,3,5,7,8,4,6,2,1,3,5,6,3,4,7,1,9,4 ]
  [ 7,1,4,3,5,7,8,4,6,2,1,3,5,6,3,4,7,1,9,4,3 ]
  [ 1,4,3,5,7,8,4,6,2,1,3,5,6,3,4,7,1,9,4,3,2 ] ]

Then I could apply np.max(input, 0) on this and would get my results. But this does not seem efficient in my case because both my array and my window width can be large (>1000000 entries and >100000 window width). The data would be blown up more or less by a factor of the window width.

I also considered using np.convolve() in some fashion but couldn't figure out a way to achieve my goal with it.

Any ideas how to do this efficiently?

Upvotes: 18

Views: 20279

Answers (7)

kylefinn
kylefinn

Reputation: 2403

I ran into this same problem and I think the existing answers overlook a major optimization opportunity. If the input is size N, and window is W, then these are all O(N * W) complexity. However computing max on these overlapping segments is highly redundant and can be reduced to O(N log(W)) with a multi-pass approach.

def sliding_max(x, win):
    cur_win = 1
    while cur_win < win:
        step = min(cur_win, win-cur_win)
        x = np.maximum(x[step:], x[:-step])
        cur_win += step
        print(f"{step=} {cur_win=} {x=}")
    return x

win = 5
x = np.array([6,4,8,7,1,4,3,5,7,2,4,6,2,1,3,5,6,3,4,7,1,9,4,3,2])

sliding_max(x, win)

prints

step=1 cur_win=2 x=array([6, 8, 8, 7, 4, 4, 5, 7, 7, 4, 6, 6, 2, 3, 5, 6, 6, 4, 7, 7, 9, 9,
       4, 3])
step=2 cur_win=4 x=array([8, 8, 8, 7, 5, 7, 7, 7, 7, 6, 6, 6, 5, 6, 6, 6, 7, 7, 9, 9, 9, 9])
step=1 cur_win=5 x=array([8, 8, 8, 7, 7, 7, 7, 7, 7, 6, 6, 6, 6, 6, 6, 7, 7, 9, 9, 9, 9])

It scales very well:

win = 100000
x = -np.arange(10000000)

%timeit sliding_max(x, win)
> 205 ms ± 334 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

And sanity check of course:

x = np.random.randint(0, 9999, size=10000)
win = 99

y = sliding_max(x, win)

y2 = np.zeros(len(y))
for i in range(len(x)-win+1): 
    y2[i] = np.max(x[i:i+win])

assert np.all(y == y2)

Upvotes: 0

Xavier Guihot
Xavier Guihot

Reputation: 61716

Starting in Numpy 1.20, the sliding_window_view provides a way to slide/roll through windows of elements. Windows that you can then find the max for:

from numpy.lib.stride_tricks import sliding_window_view

# values = np.array([6,4,8,7,1,4,3,5,7,2,4,6,2,1,3,5,6,3,4,7,1,9,4,3,2])
np.max(sliding_window_view(values, window_shape = 5), axis = 1)
# array([8, 8, 8, 7, 7, 7, 7, 7, 7, 6, 6, 6, 6, 6, 6, 7, 7, 9, 9, 9, 9])

where:

  • window_shape is the size of the sliding window
  • np.max(array, axis = 1) finds the max for each sub-array

and the intermediate result of the sliding is:

sliding_window_view(values, window_shape = 5)
# array([[6, 4, 8, 7, 1],
#        [4, 8, 7, 1, 4],
#        [8, 7, 1, 4, 3],
#        ...
#        [7, 1, 9, 4, 3],
#        [1, 9, 4, 3, 2]])

Upvotes: 6

Kim BoChan
Kim BoChan

Reputation: 1

If you have two dimension data, for example stock price and want to get rolling max or whatever, this will works. Caculating without using iteration.

n = 5  # size of rolling window

data_expanded = np.expand_dims(data, 1)
data_shift = [np.roll(data_expanded, shift=-i, axis=2) for i in range(n)]
data_shift = np.concatenate(data_shift, axis=1)

data_max = np.max(data_shift, axis=1)  # max, mean, std...

Upvotes: 0

Alfe
Alfe

Reputation: 59516

I have tried several variants now and would declare the Pandas version as the winner of this performance race. I tried several variants, even using a binary tree (implemented in pure Python) for quickly computing maxes of arbitrary subranges. (Source available on demand). The best algorithm I came up with myself was a plain rolling window using a ringbuffer; the max of that only needed to be recomputed completely if the current max value was dropped from it in this iteration; otherwise it would remain or increase to the next new value. Compared with the old libraries, this pure-Python implementation was faster than the rest.

In the end I found that the version of the libraries in question was highly relevant. The rather old versions I was mainly still using were way slower than the modern versions. Here are the numbers for 1M numbers, rollingMax'ed with a window of size 100k:

         old (slow HW)           new (better HW)
scipy:   0.9.0:  21.2987391949   0.13.3:  11.5804400444
pandas:  0.7.0:  13.5896410942   0.18.1:   0.0551438331604
numpy:   1.6.1:   1.17417216301  1.8.2:    0.537392139435

Here is the implementation of the pure numpy version using a ringbuffer:

def rollingMax(a, window):
  def eachValue():
    w = a[:window].copy()
    m = w.max()
    yield m
    i = 0
    j = window
    while j < len(a):
      oldValue = w[i]
      newValue = w[i] = a[j]
      if newValue > m:
        m = newValue
      elif oldValue == m:
        m = w.max()
      yield m
      i = (i + 1) % window
      j += 1
  return np.array(list(eachValue()))

For my input this works great because I'm handling audio data with lots of peaks in all directions. If you put a constantly decreasing signal into it (e. g. -np.arange(10000000)), then you will experience the worst case (and maybe you should reverse the input and the output in such cases).

I just include this in case someone wants to do this task on a machine with old libraries.

Upvotes: 5

Abdou
Abdou

Reputation: 13274

Pandas has a rolling method for both Series and DataFrames, and that could be of use here:

import pandas as pd

lst = [6,4,8,7,1,4,3,5,7,8,4,6,2,1,3,5,6,3,4,7,1,9,4,3,2]
lst1 = pd.Series(lst).rolling(5).max().dropna().tolist()

# [8.0, 8.0, 8.0, 7.0, 7.0, 8.0, 8.0, 8.0, 8.0, 8.0, 6.0, 6.0, 6.0, 6.0, 6.0, 7.0, 7.0, 9.0, 9.0, 9.0, 9.0]

For consistency, you can coerce each element of lst1 to int:

[int(x) for x in lst1]

# [8, 8, 8, 7, 7, 8, 8, 8, 8, 8, 6, 6, 6, 6, 6, 7, 7, 9, 9, 9, 9]

Upvotes: 16

Guido
Guido

Reputation: 104

First of all, I think there is a mistake in your explanation because the 10th element of your initial imput array at the beginning of your explanation is equal to 8, and below, where you apply the window, it is 2.

After correcting that, I think that the code that does what you want is the following:

import numpy as np
a=np.array([ 6,4,8,7,1,4,3,5,7,8,4,6,2,1,3,5,6,3,4,7,1,9,4,3,2 ])
window=5
for i in range(0,len(a)-window,1): 
    b[i] = np.amax(a[i:i+window])

I think, this way is better than creating a shifted 2D version of your imput because when you create such a version you need to use much more memory than using the original imput array, so you may run out of memory if the input is large.

Upvotes: 1

Divakar
Divakar

Reputation: 221624

Approach #1 : You could use 1D max filter from Scipy -

from scipy.ndimage.filters import maximum_filter1d

def max_filter1d_valid(a, W):
    hW = (W-1)//2 # Half window size
    return maximum_filter1d(a,size=W)[hW:-hW]

Approach #2 : Here's another approach with strides : strided_app to create a 2D shifted version as view into the array pretty efficiently and that should let us use any custom reduction operation along the second axis afterwards -

def max_filter1d_valid_strided(a, W):
    return strided_app(a, W, S=1).max(axis=1)

Runtime test -

In [55]: a = np.random.randint(0,10,(10000))

# @Abdou's solution using pandas rolling
In [56]: %timeit pd.Series(a).rolling(5).max().dropna().tolist()
1000 loops, best of 3: 999 µs per loop

In [57]: %timeit max_filter1d_valid(a, W=5)
    ...: %timeit max_filter1d_valid_strided(a, W=5)
    ...: 
10000 loops, best of 3: 90.5 µs per loop
10000 loops, best of 3: 87.9 µs per loop

Upvotes: 11

Related Questions