baptiste pagneux
baptiste pagneux

Reputation: 251

improving code efficiency: standard deviation on sliding windows

I am trying to improve function which calculate for each pixel of an image the standard deviation of the pixels located in the neighborhood of the pixel. My function uses two embedded loops to run accross the matrix, and it is the bottleneck of my programme. I guess there is likely a way to improve it by getting rid of the loops thanks to numpy, but I don't know how to proceed. Any advice are welcome!

regards

def sliding_std_dev(image_original,radius=5) :
    height, width = image_original.shape
    result = np.zeros_like(image_original) # initialize the output matrix
    hgt = range(radius,height-radius)
    wdt = range(radius,width-radius)
    for i in hgt:
        for j in wdt:
            result[i,j] = np.std(image_original[i-radius:i+radius,j-radius:j+radius])
    return result

Upvotes: 24

Views: 11865

Answers (5)

nneonneo
nneonneo

Reputation: 179532

Cool trick: you can compute the standard deviation given just the sum of squared values and the sum of values in the window.

Therefore, you can compute the standard deviation very fast using a uniform filter on the data:

from scipy.ndimage import uniform_filter

def window_stdev(arr, radius):
    c1 = uniform_filter(arr, radius*2, mode='constant', origin=-radius)
    c2 = uniform_filter(arr*arr, radius*2, mode='constant', origin=-radius)
    return ((c2 - c1*c1)**.5)[:-radius*2+1,:-radius*2+1]

This is ridiculously faster than the original function. For a 1024x1024 array and a radius of 20, the old function takes 34.11 seconds, and the new function takes 0.11 seconds, a speed-up of 300-fold.


How does this work mathematically? It computes the quantity sqrt(mean(x^2) - mean(x)^2) for each window. We can derive this quantity from the standard deviation sqrt(mean((x - mean(x))^2)) as follows:

Let E be the expectation operator (basically mean()), and X be the random variable of data. Then:

E[(X - E[X])^2]
= E[X^2 - 2X*E[X] + E[X]^2]
= E[X^2] - E[2X*E[X]] + E[E[X]^2] (by the linearity of the expectation operator)
= E[X^2] - 2E[X]*E[X] + E[X]^2 (again by linearity, and the fact that E[X] is a constant)
= E[X^2] - E[X]^2

which proves that the quantity computed using this technique is mathematically equivalent to the standard deviation.

Upvotes: 40

Ben
Ben

Reputation: 311

After attempting to use several of the excellent solutions here, I was running into trouble with data that contained NaN's. Both the uniform_filter and np.cumsum() solutions were causing Nan's to propagate through the output array instead of just being ignored.

My solution below essentially just swaps the windowed sum function in @Jaime's answer with a convolution, which is robust to NaN's.

def windowed_sum(arr: np.ndarray, radius: int) -> np.ndarray:
    """radius=1 means the pixel itself and the 8 surrounding pixels"""

    kernel = np.ones((radius * 2 + 1, radius * 2 + 1), dtype=int)
    return convolve(arr, kernel, mode="constant", cval=0.0)

def windowed_var(arr: np.ndarray, radius: int) -> np.ndarray:
    """Note: this returns smaller in size than the input array (by radius)"""

    diameter = radius * 2 + 1
    win_sum = windowed_sum(arr, radius)[radius:-radius, radius:-radius]
    win_sum_2 = windowed_sum(arr * arr, radius)[radius:-radius, radius:-radius]
    return (win_sum_2 - win_sum * win_sum / diameter / diameter) / diameter / diameter

def windowed_std(arr: np.ndarray, radius: int) -> np.ndarray:

    output = np.full_like(arr, np.nan, dtype=np.float64)

    var_arr = windowed_var(arr, radius)
    std_arr = np.sqrt(var_arr)
    output[radius:-radius, radius:-radius] = std_arr

    return output

This performs a little slower than the uniform_filter, but still MUCH faster than many other methods (stacking arrays, iteration, etc)

>>> data = np.random.random((1024, 1024))
>>> %timeit windowed_std(data, 4)
158 ms ± 695 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

Compared to the uniform_filter which performs around 36 ms for the same size data

With some NaN's:

data = np.arange(100, dtype=np.float64).reshape(10, 10)
data[3:4, 3:4] = np.nan
windowed_std(data, 1)

array([[ nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan],
       [ nan, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21,  nan],
       [ nan, 8.21,  nan,  nan,  nan, 8.21, 8.21, 8.21, 8.21,  nan],
       [ nan, 8.21,  nan,  nan,  nan, 8.21, 8.21, 8.21, 8.21,  nan],
       [ nan, 8.21,  nan,  nan,  nan, 8.21, 8.21, 8.21, 8.21,  nan],
       [ nan, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21,  nan],
       [ nan, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21,  nan],
       [ nan, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21,  nan],
       [ nan, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21,  nan],
       [ nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan,  nan]])

Upvotes: 0

Jaime
Jaime

Reputation: 67447

The most often used method to do this kind of things in image processing is using summed area tables, an idea introduced in this paper in 1984. The idea is that, when you compute a quantity by adding over a window, and move the window e.g. one pixel to the right, you don't need to add all the items in the new window, you only need to subtract the leftmost column from the total, and add the new rightmost column. So if you create an accumulated sum array over both dimensions from your array, you can get the sum over a window with a couple of sums and a subtraction. If you keep summed area tables for your array and its square, it's very easy to get the variance from those two. Here's an implementation:

def windowed_sum(a, win):
    table = np.cumsum(np.cumsum(a, axis=0), axis=1)
    win_sum = np.empty(tuple(np.subtract(a.shape, win-1)))
    win_sum[0,0] = table[win-1, win-1]
    win_sum[0, 1:] = table[win-1, win:] - table[win-1, :-win]
    win_sum[1:, 0] = table[win:, win-1] - table[:-win, win-1]
    win_sum[1:, 1:] = (table[win:, win:] + table[:-win, :-win] -
                       table[win:, :-win] - table[:-win, win:])
    return win_sum

def windowed_var(a, win):
    win_a = windowed_sum(a, win)
    win_a2 = windowed_sum(a*a, win)
    return (win_a2 - win_a * win_a / win/ win) / win / win

To see that this works:

>>> a = np.arange(25).reshape(5,5)
>>> windowed_var(a, 3)
array([[ 17.33333333,  17.33333333,  17.33333333],
       [ 17.33333333,  17.33333333,  17.33333333],
       [ 17.33333333,  17.33333333,  17.33333333]])
>>> np.var(a[:3, :3])
17.333333333333332
>>> np.var(a[-3:, -3:])
17.333333333333332

This should run a couple of notches faster than convolution based methods.

Upvotes: 13

Daniel
Daniel

Reputation: 19547

You can first obtain the indices and then use np.take to form the new array:

def new_std_dev(image_original,radius=5):
    cols,rows=image_original.shape

    #First obtain the indices for the top left position
    diameter=np.arange(radius*2)
    x,y=np.meshgrid(diameter,diameter)
    index=np.ravel_multi_index((y,x),(cols,rows)).ravel()

    #Cast this in two dimesions and take the stdev
    index=index+np.arange(rows-radius*2)[:,None]+np.arange(cols-radius*2)[:,None,None]*(rows)
    data=np.std(np.take(image_original,index),-1)

    #Add the zeros back to the output array
    top=np.zeros((radius,rows-radius*2))
    sides=np.zeros((cols,radius))

    data=np.vstack((top,data,top))
    data=np.hstack((sides,data,sides))
    return data

First generate some random data and check timings:

a=np.random.rand(50,20)

print np.allclose(new_std_dev(a),sliding_std_dev(a))
True

%timeit sliding_std_dev(a)
100 loops, best of 3: 18 ms per loop

%timeit new_std_dev(a)
1000 loops, best of 3: 472 us per loop

For larger arrays its always faster as long as you have enough memory:

a=np.random.rand(200,200)

print np.allclose(new_std_dev(a),sliding_std_dev(a))
True

%timeit sliding_std_dev(a)
1 loops, best of 3: 1.58 s per loop

%timeit new_std_dev(a)
10 loops, best of 3: 52.3 ms per loop

The original function is faster for very small arrays, it looks like the break even point is when hgt*wdt >50. Something to note your function is taking square frames and placing the std dev in the bottom right index, not sampling around the index. Is this intentional?

Upvotes: 1

Joe Kington
Joe Kington

Reputation: 284752

First off, there's more than one way to do this.

It's not the most efficient speed-wise, but using scipy.ndimage.generic_filter will allow you to easily apply an arbitrary python function over a moving window.

As a quick example:

result = scipy.ndimage.generic_filter(data, np.std, size=2*radius)

Note that the boundary conditions can be controlled by mode kwarg.


Another way to do this is to use some various striding tricks to make a view of the array that's effectively a moving window, and then apply np.std along the last axis. (Note: this is taken from one of my previous answers here: https://stackoverflow.com/a/4947453/325565)

def strided_sliding_std_dev(data, radius=5):
    windowed = rolling_window(data, (2*radius, 2*radius))
    shape = windowed.shape
    windowed = windowed.reshape(shape[0], shape[1], -1)
    return windowed.std(axis=-1)

def rolling_window(a, window):
    """Takes a numpy array *a* and a sequence of (or single) *window* lengths
    and returns a view of *a* that represents a moving window."""
    if not hasattr(window, '__iter__'):
        return rolling_window_lastaxis(a, window)
    for i, win in enumerate(window):
        if win > 1:
            a = a.swapaxes(i, -1)
            a = rolling_window_lastaxis(a, win)
            a = a.swapaxes(-2, i)
    return a

def rolling_window_lastaxis(a, window):
    """Directly taken from Erik Rigtorp's post to numpy-discussion.
    <http://www.mail-archive.com/[email protected]/msg29450.html>"""
    if window < 1:
       raise ValueError, "`window` must be at least 1."
    if window > a.shape[-1]:
       raise ValueError, "`window` is too long."
    shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
    strides = a.strides + (a.strides[-1],)
    return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)

It's a bit hard to understand what's going on here at first glance. Not to plug one of my own answers, but I don't want to re-type the explanation, so have a look here: https://stackoverflow.com/a/4924433/325565 if you haven't see these sorts of "striding" tricks before.

If we compare timings with a 100x100 array of random floats with a radius of 5, it's ~10x faster than the original or the generic_filter version. However, you have no flexibility in the boundary conditions with this version. (It's identical to what you're currently doing, while the generic_filter version gives you lots of flexibility at the expense of speed.)

# Your original function with nested loops
In [21]: %timeit sliding_std_dev(data)
1 loops, best of 3: 237 ms per loop

# Using scipy.ndimage.generic_filter
In [22]: %timeit ndimage_std_dev(data)
1 loops, best of 3: 244 ms per loop

# The "stride-tricks" version above
In [23]: %timeit strided_sliding_std_dev(data)
100 loops, best of 3: 15.4 ms per loop

# Ophion's version that uses `np.take`
In [24]: %timeit new_std_dev(data)
100 loops, best of 3: 19.3 ms per loop

The downside to the "stride-tricks" version is that unlike "normal" strided rolling window tricks, this version does make a copy, and it's much larger than the original array. You will run into memory problems if you use this on a large array! (On a side note, it's basically equivalent to @Ophion's answer in terms of memory use and speed. It's just a different approach to doing the same thing.)

Upvotes: 3

Related Questions