Reputation: 251
I am trying to improve function which calculate for each pixel of an image the standard deviation of the pixels located in the neighborhood of the pixel. My function uses two embedded loops to run accross the matrix, and it is the bottleneck of my programme. I guess there is likely a way to improve it by getting rid of the loops thanks to numpy, but I don't know how to proceed. Any advice are welcome!
regards
def sliding_std_dev(image_original,radius=5) :
height, width = image_original.shape
result = np.zeros_like(image_original) # initialize the output matrix
hgt = range(radius,height-radius)
wdt = range(radius,width-radius)
for i in hgt:
for j in wdt:
result[i,j] = np.std(image_original[i-radius:i+radius,j-radius:j+radius])
return result
Upvotes: 24
Views: 11865
Reputation: 179532
Cool trick: you can compute the standard deviation given just the sum of squared values and the sum of values in the window.
Therefore, you can compute the standard deviation very fast using a uniform filter on the data:
from scipy.ndimage import uniform_filter
def window_stdev(arr, radius):
c1 = uniform_filter(arr, radius*2, mode='constant', origin=-radius)
c2 = uniform_filter(arr*arr, radius*2, mode='constant', origin=-radius)
return ((c2 - c1*c1)**.5)[:-radius*2+1,:-radius*2+1]
This is ridiculously faster than the original function. For a 1024x1024 array and a radius of 20, the old function takes 34.11 seconds, and the new function takes 0.11 seconds, a speed-up of 300-fold.
How does this work mathematically? It computes the quantity sqrt(mean(x^2) - mean(x)^2)
for each window. We can derive this quantity from the standard deviation sqrt(mean((x - mean(x))^2))
as follows:
Let E
be the expectation operator (basically mean()
), and X
be the random variable of data. Then:
E[(X - E[X])^2]
= E[X^2 - 2X*E[X] + E[X]^2]
= E[X^2] - E[2X*E[X]] + E[E[X]^2]
(by the linearity of the expectation operator)
= E[X^2] - 2E[X]*E[X] + E[X]^2
(again by linearity, and the fact that E[X]
is a constant)
= E[X^2] - E[X]^2
which proves that the quantity computed using this technique is mathematically equivalent to the standard deviation.
Upvotes: 40
Reputation: 311
After attempting to use several of the excellent solutions here, I was running into trouble with data that contained NaN's. Both the uniform_filter
and np.cumsum()
solutions were causing Nan's to propagate through the output array instead of just being ignored.
My solution below essentially just swaps the windowed sum function in @Jaime's answer with a convolution, which is robust to NaN's.
def windowed_sum(arr: np.ndarray, radius: int) -> np.ndarray:
"""radius=1 means the pixel itself and the 8 surrounding pixels"""
kernel = np.ones((radius * 2 + 1, radius * 2 + 1), dtype=int)
return convolve(arr, kernel, mode="constant", cval=0.0)
def windowed_var(arr: np.ndarray, radius: int) -> np.ndarray:
"""Note: this returns smaller in size than the input array (by radius)"""
diameter = radius * 2 + 1
win_sum = windowed_sum(arr, radius)[radius:-radius, radius:-radius]
win_sum_2 = windowed_sum(arr * arr, radius)[radius:-radius, radius:-radius]
return (win_sum_2 - win_sum * win_sum / diameter / diameter) / diameter / diameter
def windowed_std(arr: np.ndarray, radius: int) -> np.ndarray:
output = np.full_like(arr, np.nan, dtype=np.float64)
var_arr = windowed_var(arr, radius)
std_arr = np.sqrt(var_arr)
output[radius:-radius, radius:-radius] = std_arr
return output
This performs a little slower than the uniform_filter
, but still MUCH faster than many other methods (stacking arrays, iteration, etc)
>>> data = np.random.random((1024, 1024))
>>> %timeit windowed_std(data, 4)
158 ms ± 695 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
Compared to the uniform_filter
which performs around 36 ms for the same size data
With some NaN's:
data = np.arange(100, dtype=np.float64).reshape(10, 10)
data[3:4, 3:4] = np.nan
windowed_std(data, 1)
array([[ nan, nan, nan, nan, nan, nan, nan, nan, nan, nan],
[ nan, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, nan],
[ nan, 8.21, nan, nan, nan, 8.21, 8.21, 8.21, 8.21, nan],
[ nan, 8.21, nan, nan, nan, 8.21, 8.21, 8.21, 8.21, nan],
[ nan, 8.21, nan, nan, nan, 8.21, 8.21, 8.21, 8.21, nan],
[ nan, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, nan],
[ nan, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, nan],
[ nan, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, nan],
[ nan, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, 8.21, nan],
[ nan, nan, nan, nan, nan, nan, nan, nan, nan, nan]])
Upvotes: 0
Reputation: 67447
The most often used method to do this kind of things in image processing is using summed area tables, an idea introduced in this paper in 1984. The idea is that, when you compute a quantity by adding over a window, and move the window e.g. one pixel to the right, you don't need to add all the items in the new window, you only need to subtract the leftmost column from the total, and add the new rightmost column. So if you create an accumulated sum array over both dimensions from your array, you can get the sum over a window with a couple of sums and a subtraction. If you keep summed area tables for your array and its square, it's very easy to get the variance from those two. Here's an implementation:
def windowed_sum(a, win):
table = np.cumsum(np.cumsum(a, axis=0), axis=1)
win_sum = np.empty(tuple(np.subtract(a.shape, win-1)))
win_sum[0,0] = table[win-1, win-1]
win_sum[0, 1:] = table[win-1, win:] - table[win-1, :-win]
win_sum[1:, 0] = table[win:, win-1] - table[:-win, win-1]
win_sum[1:, 1:] = (table[win:, win:] + table[:-win, :-win] -
table[win:, :-win] - table[:-win, win:])
return win_sum
def windowed_var(a, win):
win_a = windowed_sum(a, win)
win_a2 = windowed_sum(a*a, win)
return (win_a2 - win_a * win_a / win/ win) / win / win
To see that this works:
>>> a = np.arange(25).reshape(5,5)
>>> windowed_var(a, 3)
array([[ 17.33333333, 17.33333333, 17.33333333],
[ 17.33333333, 17.33333333, 17.33333333],
[ 17.33333333, 17.33333333, 17.33333333]])
>>> np.var(a[:3, :3])
17.333333333333332
>>> np.var(a[-3:, -3:])
17.333333333333332
This should run a couple of notches faster than convolution based methods.
Upvotes: 13
Reputation: 19547
You can first obtain the indices and then use np.take
to form the new array:
def new_std_dev(image_original,radius=5):
cols,rows=image_original.shape
#First obtain the indices for the top left position
diameter=np.arange(radius*2)
x,y=np.meshgrid(diameter,diameter)
index=np.ravel_multi_index((y,x),(cols,rows)).ravel()
#Cast this in two dimesions and take the stdev
index=index+np.arange(rows-radius*2)[:,None]+np.arange(cols-radius*2)[:,None,None]*(rows)
data=np.std(np.take(image_original,index),-1)
#Add the zeros back to the output array
top=np.zeros((radius,rows-radius*2))
sides=np.zeros((cols,radius))
data=np.vstack((top,data,top))
data=np.hstack((sides,data,sides))
return data
First generate some random data and check timings:
a=np.random.rand(50,20)
print np.allclose(new_std_dev(a),sliding_std_dev(a))
True
%timeit sliding_std_dev(a)
100 loops, best of 3: 18 ms per loop
%timeit new_std_dev(a)
1000 loops, best of 3: 472 us per loop
For larger arrays its always faster as long as you have enough memory:
a=np.random.rand(200,200)
print np.allclose(new_std_dev(a),sliding_std_dev(a))
True
%timeit sliding_std_dev(a)
1 loops, best of 3: 1.58 s per loop
%timeit new_std_dev(a)
10 loops, best of 3: 52.3 ms per loop
The original function is faster for very small arrays, it looks like the break even point is when hgt*wdt >50
. Something to note your function is taking square frames and placing the std dev in the bottom right index, not sampling around the index. Is this intentional?
Upvotes: 1
Reputation: 284752
First off, there's more than one way to do this.
It's not the most efficient speed-wise, but using scipy.ndimage.generic_filter
will allow you to easily apply an arbitrary python function over a moving window.
As a quick example:
result = scipy.ndimage.generic_filter(data, np.std, size=2*radius)
Note that the boundary conditions can be controlled by mode
kwarg.
Another way to do this is to use some various striding tricks to make a view of the array that's effectively a moving window, and then apply np.std
along the last axis. (Note: this is taken from one of my previous answers here: https://stackoverflow.com/a/4947453/325565)
def strided_sliding_std_dev(data, radius=5):
windowed = rolling_window(data, (2*radius, 2*radius))
shape = windowed.shape
windowed = windowed.reshape(shape[0], shape[1], -1)
return windowed.std(axis=-1)
def rolling_window(a, window):
"""Takes a numpy array *a* and a sequence of (or single) *window* lengths
and returns a view of *a* that represents a moving window."""
if not hasattr(window, '__iter__'):
return rolling_window_lastaxis(a, window)
for i, win in enumerate(window):
if win > 1:
a = a.swapaxes(i, -1)
a = rolling_window_lastaxis(a, win)
a = a.swapaxes(-2, i)
return a
def rolling_window_lastaxis(a, window):
"""Directly taken from Erik Rigtorp's post to numpy-discussion.
<http://www.mail-archive.com/[email protected]/msg29450.html>"""
if window < 1:
raise ValueError, "`window` must be at least 1."
if window > a.shape[-1]:
raise ValueError, "`window` is too long."
shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
strides = a.strides + (a.strides[-1],)
return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)
It's a bit hard to understand what's going on here at first glance. Not to plug one of my own answers, but I don't want to re-type the explanation, so have a look here: https://stackoverflow.com/a/4924433/325565 if you haven't see these sorts of "striding" tricks before.
If we compare timings with a 100x100 array of random floats with a radius
of 5, it's ~10x faster than the original or the generic_filter
version. However, you have no flexibility in the boundary conditions with this version. (It's identical to what you're currently doing, while the generic_filter
version gives you lots of flexibility at the expense of speed.)
# Your original function with nested loops
In [21]: %timeit sliding_std_dev(data)
1 loops, best of 3: 237 ms per loop
# Using scipy.ndimage.generic_filter
In [22]: %timeit ndimage_std_dev(data)
1 loops, best of 3: 244 ms per loop
# The "stride-tricks" version above
In [23]: %timeit strided_sliding_std_dev(data)
100 loops, best of 3: 15.4 ms per loop
# Ophion's version that uses `np.take`
In [24]: %timeit new_std_dev(data)
100 loops, best of 3: 19.3 ms per loop
The downside to the "stride-tricks" version is that unlike "normal" strided rolling window tricks, this version does make a copy, and it's much larger than the original array. You will run into memory problems if you use this on a large array! (On a side note, it's basically equivalent to @Ophion's answer in terms of memory use and speed. It's just a different approach to doing the same thing.)
Upvotes: 3