2cynykyl
2cynykyl

Reputation: 1083

Convert an array of peaks to a series of steps that represent to most recent peak value

Given an array of peaks like this:

peaks = [0, 5, 0, 3, 2, 0, 1, 7, 0]

How can I create an array of steps that indicate the most recent peak value, like this:

steps = [0, 5, 5, 3, 3, 3, 3, 7, 7]

Requirements:


Note

I recently asked a question that turned out to be a dupe (easily solved with scipy.maximum.accumulate), but my question also contained an optional 'would be nice if' twist, as outlined above. It turns out that I actually have a need for this second behavior as well, so I'm re-posting just this part.

Upvotes: 3

Views: 107

Answers (1)

Paul Panzer
Paul Panzer

Reputation: 53089

Here is a solution that handles ND and can detect "broad peaks" like ..., 0, 4, 4, 4, 3, ... but not ..., 0, 4, 4, 4, 7, ....

import numpy as np
import operator as op

def keep_peaks(A, axis=-1):
    B = np.swapaxes(A, axis, -1)
    # take differences between consecutive elements along axis
    # pad with -1 at the start and the end
    # the most efficient way is to allocate first, because otherwise
    # padding would involve reallocation and a copy
    # note that in order to avoid that copy we use np.subtract and its
    # out kwd
    updown = np.empty((*B.shape[:-1], B.shape[-1]+1), B.dtype)
    updown[..., 0], updown[..., -1] = -1, -1
    np.subtract(B[..., 1:], B[..., :-1], out=updown[..., 1:-1])
    # extract indices where the there is a change along axis
    chnidx = np.where(updown)
    # get the values of the changes
    chng = updown[chnidx]
    # find indices of indices 1) where we go up and 2) the next change is
    # down (note how the padded -1's at the end are useful here)
    # also include the beginning of each 1D subarray
    pkidx, = np.where((chng[:-1] > 0) & (chng[1:] < 0) | (chnidx[-1][:-1] == 0))
    # use indices of indices to retain only peak indices
    pkidx = (*map(op.itemgetter(pkidx), chnidx),)
    # construct array of changes of the result along axis
    # these will be zero everywhere
    out = np.zeros_like(A)
    aux = out.swapaxes(axis, -1)
    # except where there is a new peak
    # at these positions we need to put the differences of peak levels
    aux[(*map(op.itemgetter(slice(1, None)), pkidx),)] = np.diff(B[pkidx])
    # we could ravel the array and do the cumsum on that, but raveling
    # a potentially noncontiguous array is expensive
    # instead we keep the shape, at the cost of having to replace the
    # value at the beginning of each 2D subarray (we do not need the
    # "line-jump" difference but the plain 1st value there)
    aux[..., 0] = B[..., 0]
    # finally, use cumsum to go from differences to plain values
    return out.cumsum(axis=axis)

peaks = [0, 5, 0, 3, 2, 0, 1, 7, 0]

print(peaks)
print(keep_peaks(peaks))

# show off axis kwd and broad peak detection
peaks3d = np.kron(np.random.randint(0, 10, (3, 6, 3)), np.ones((1, 2, 1), int))

print(peaks3d.swapaxes(1, 2))
print(keep_peaks(peaks3d, 1).swapaxes(1, 2))

Sample run:

[0, 5, 0, 3, 2, 0, 1, 7, 0]
[0 5 5 3 3 3 3 7 7]
[[[5 5 3 3 1 1 4 4 9 9 7 7]
  [2 2 9 9 3 3 4 4 3 3 7 7]
  [9 9 0 0 2 2 5 5 7 7 9 9]]

 [[1 1 3 3 9 9 3 3 7 7 0 0]
  [1 1 1 1 4 4 5 5 0 0 3 3]
  [5 5 5 5 8 8 1 1 2 2 7 7]]

 [[6 6 3 3 8 8 2 2 3 3 2 2]
  [6 6 9 9 3 3 9 9 3 3 9 9]
  [1 1 5 5 7 7 2 2 7 7 1 1]]]
[[[5 5 5 5 5 5 5 5 9 9 9 9]
  [2 2 9 9 9 9 4 4 4 4 7 7]
  [9 9 9 9 9 9 9 9 9 9 9 9]]

 [[1 1 1 1 9 9 9 9 7 7 7 7]
  [1 1 1 1 1 1 5 5 5 5 3 3]
  [5 5 5 5 8 8 8 8 8 8 7 7]]

 [[6 6 6 6 8 8 8 8 3 3 3 3]
  [6 6 9 9 9 9 9 9 9 9 9 9]
  [1 1 1 1 7 7 7 7 7 7 7 7]]]

Upvotes: 2

Related Questions