Reputation: 59416
I have an arbitrary input curve, given as numpy array. I want to create a smoothed version of it, similar to a rolling mean, but which is strictly greater than the original and strictly smooth. I could use the rolling mean value but if the input curve has a negative peak, the smoothed version will drop below the original around that peak. I could then simply use the maximum of this and the original but that would introduce non-smooth spots where the transition occurs.
Furthermore, I would like to be able to parameterize the algorithm with a look-ahead and a look-behind for this resulting curve, so that given a large look-ahead and a small look-behind the resulting curve would rather stick to the falling edges, and with a large look-behind and a small look-ahead it would rather be close to rising edges.
I tried using the pandas.Series(a).rolling()
facility to get rolling means, rolling maxima, etc., but up to now I found no way to generate a smoothed version of my input which in all cases stays above to input.
I guess there is a way to combine rolling maxima and rolling means somehow to achieve what I want, so here is some code for computing these:
import pandas as pd
import numpy as np
my input curve:
original = np.array([ 5, 5, 5, 8, 8, 8, 2, 2, 2, 2, 2, 3, 3, 7 ])
This can be padded left (pre) and right (post) with the edge values as a preparation for any rolling function:
pre = 2
post = 3
padded = np.pad(original, (pre, post), 'edge')
Now we can apply a rolling mean:
smoothed = pd.Series(padded).rolling(
pre + post + 1).mean().get_values()[pre+post:]
But now the smoothed version is below the original, e. g. at index 4
:
print(original[4], smoothed[4]) # 8 and 5.5
To compute a rolling maximum, you can use this:
maximum = pd.Series(padded).rolling(
pre + post + 1).max().get_values()[pre+post:]
But a rolling maximum alone would of course not be smooth in many cases and would display a lot of flat tops around the peaks of the original. I would prefer a smooth approach to these peaks.
If you have also pyqtgraph installed, you can easily plot such curves:
import pyqtgraph as pg
p = pg.plot(original)
p.plotItem.plot(smoothed, pen=(255,0,0))
(Of course, other plot libraries would do as well.)
What I would like to have as a result is a curve which is e. g. like the one formed by these values:
goal = np.array([ 5, 7, 7.8, 8, 8, 8, 7, 5, 3.5, 3, 4, 5.5, 6.5, 7 ])
Here is an image of the curves. The white line is the original (input), the red the rolling mean, the green is about what I would like to have:
EDIT: I just found the functions baseline()
and envelope()
of a module named peakutils
. These two functions can compute polynomials of a given degree fitting the lower resp. upper peaks of the input. For small samples this can be a good solution. I'm looking for something which can also be applied on very large samples with millions of values; then the degree would need to be very high and the computation then also takes a considerate amount of time. Doing it piecewise (section for section) opens up a bunch of new questions and problems (like how to stitch properly while staying smooth and guaranteed above the input, performance when processing a massive amount of pieces etc.), so I'd like to avoid that if possible.
EDIT 2: I have a promising approach by a repetitively applying a filter which creates a rolling mean, shifts it slightly to the left and the right, and then takes the maximum of these two and the original sample. After applying this several times, it smoothes out the curve in the way I wanted it. Some unsmooth spots can remain, though, in deep valleys. Here is the code for this:
pre = 30
post = 30
margin = 10
s = [ np.array(sum([[ x ] * 100 for x in
[ 5, 5, 5, 8, 8, 8, 2, 2, 2, 2, 2, 3, 3, 7 ]], [])) ]
for _ in range(30):
s.append(np.max([
pd.Series(np.pad(s[-1], (margin+pre, post), 'edge')).rolling(
1 + pre + post).mean().get_values()[pre+post:-margin],
pd.Series(np.pad(s[-1], (pre, post+margin), 'edge')).rolling(
1 + pre + post).mean().get_values()[pre+post+margin:],
s[-1]], 0))
This creates 30 iterations of applying the filter, plotting these can be done using pyqtplot so:
p = pg.plot(original)
for q in s:
p.plotItem.plot(q, pen=(255, 100, 100))
The resulting image looks like this:
There are two aspects I don't like about this approach: ① It needs iterating lots of time (which slows me down), ② it still has unsmooth parts in the valleys (although in my usecase this might be acceptable).
Upvotes: 3
Views: 3069
Reputation: 59416
I have now played around quite a bit and I think I found two main answers which solve my direct need. I will give them below.
import numpy as np
import pandas as pd
from scipy import signal
import pyqtgraph as pg
These are just the necessary imports, used in all code blow. pyqtgraph
is only used for displaying stuff of course, so you do not really need it.
This can be used to create a smooth line which is always above the signal, but it cannot distinguish between rising and falling edges, so the curve around a single peak will look symmetrical. In many cases this might be quite okay and since it is way less complex than the asymmetrical solution below (and also does not have any quirks I would know about).
s = np.repeat([5, 5, 5, 8, 8, 8, 2, 2, 2, 2, 2, 3, 3, 7], 400) + 0.1
s *= np.random.random(len(s))
pre = post = 400
x = pd.Series(np.pad(s, (pre, post), 'edge')).rolling(
pre + 1 + post).max().get_values()[pre+post:]
y = pd.Series(np.pad(x, (pre, post), 'edge')).rolling(
pre + 1 + post, win_type='blackman').mean().get_values()[pre+post:]
p = pg.plot(s, pen=(100,100,100))
for c, pen in ((x, (0, 200, 200)),
(y, pg.mkPen((255, 255, 255), width=3, style=3))):
p.plotItem.plot(c, pen=pen)
My usecase called for a version which allowed to distinguish between rising and falling edges. The speed of the output should be different when falling or when rising.
Comment: Used as an envelope for a compressor/expander, a quickly rising curve would mean to dampen the effect of a sudden loud noise almost completely, while a slowly rising curve would mean to slowly compress the signal for a long time before the loud noise, keeping the dynamics when the bang appears. On the other hand, if the curve falls quickly after a loud noise, this would make quiet stuff shortly after a bang audible while a slowly falling curve would keep the dynamics there as well and only slowly expanding the signal back to normal levels.
s = np.repeat([5, 5, 5, 8, 8, 8, 2, 2, 2, 2, 2, 3, 3, 7], 400) + 0.1
s *= np.random.random(len(s))
pre, post = 100, 1000
t = pd.Series(np.pad(s, (post, pre), 'edge')).rolling(
pre + 1 + post).max().get_values()[pre+post:]
g = signal.get_window('boxcar', pre*2)[pre:]
g /= g.sum()
u = np.convolve(np.pad(t, (pre, 0), 'edge'), g)[pre:]
g = signal.get_window('boxcar', post*2)[:post]
g /= g.sum()
v = np.convolve(np.pad(t, (0, post), 'edge'), g)[post:]
u, v = u[:len(v)], v[:len(u)]
w = np.min(np.array([ u, v ]),0)
pre = post = max(100, min(pre, post)*3)
x = pd.Series(np.pad(w, (pre, post), 'edge')).rolling(
pre + 1 + post).max().get_values()[pre+post:]
y = pd.Series(np.pad(x, (pre, post), 'edge')).rolling(
pre + 1 + post, win_type='blackman').mean().get_values()[pre+post:]
p = pg.plot(s, pen=(100,100,100))
for c, pen in ((t, (200, 0, 0)),
(u, (200, 200, 0)),
(v, (0, 200, 0)),
(w, (200, 0, 200)),
(x, (0, 200, 200)),
(y, pg.mkPen((255, 255, 255), width=3))):
p.plotItem.plot(c, pen=pen)
This sequence combines ruthlessly several methods of signal processing.
Upvotes: 1
Reputation: 2302
As pointed out in my note, the behaviour of your green line is inconsistent in the regions before and after the eight-high plateau. If the left region behavior is what you want, you could do something like this:
import numpy as np
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d
from scipy.spatial import ConvexHull
# %matplotlib inline # for interactive notebooks
y=np.array([ 5, 5, 5, 8, 8, 8, 2, 2, 2, 2, 2, 3, 3, 7])
x=np.array(range(len(y)))
#######
# This essentially selects the vertices that you'd touch streatching a
# rubber band over the top of the function
vs = ConvexHull(np.asarray([x,y]).transpose()).vertices
indices_of_upper_hull_verts = list(reversed(np.concatenate([vs[np.where(vs == len(x)-1)[0][0]: ],vs[0:1]])))
newX = x[indices_of_upper_hull_verts]
newY = y[indices_of_upper_hull_verts]
#########
x_smooth = np.linspace(newX.min(), newX.max(),500)
f = interp1d(newX, newY, kind='quadratic')
y_smooth=f(x_smooth)
plt.plot (x,y)
plt.plot (x_smooth,y_smooth)
plt.scatter (x, y)
which yields:
UPDATE:
Here's an alternative that might better suit you. If instead of a rolling average you use a simple convolution centered around 1, the resulting curve will always be larger than the input. Wings of the convolution kernel can be adjusted for look-ahead/look-behind.
Like this:
import numpy as np
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d
from scipy.ndimage.filters import convolve
## For interactive notebooks
#%matplotlib inline
y=np.array([ 5, 5, 5, 8, 8, 8, 2, 2, 2, 2, 2, 3, 3, 7]).astype(np.float)
preLength = 1
postLength = 1
preWeight = 0.2
postWeight = 0.2
kernal = [preWeight/preLength for i in range(preLength)] + [1] + [postWeight/postLength for i in range(postLength)]
output = convolve(y,kernal)
x=np.array(range(len(y)))
plt.plot (x,y)
plt.plot (x,output)
plt.scatter (x, y)
A drawback is that because the integrated kernel will typically be larger than one (which ensures that the output curve is smooth and never below the input), the output curve will always be larger than the input curve, e.g. on top of the large peak and not sitting right on top as you drew.
Upvotes: 0
Reputation: 19750
As an initial stab at part of the problem, I've produced a function which fits a polynomial to the data by minimising the integral subject to constraints that the polynomial be strictly above the points. I suspect if you do this piecewise over your data, it may work for you.
import scipy.optimize
def upperpoly(xdata, ydata, order):
def objective(p):
"""Minimize integral"""
pint = np.polyint(p)
integral = np.polyval(pint, xdata[-1]) - np.polyval(pint, xdata[0])
return integral
def constraints(p):
"""Polynomial values be > data at every point"""
return np.polyval(p, xdata) - ydata
p0 = np.polyfit(xdata, ydata, order)
y0 = np.polyval(p0, xdata)
shift = (ydata - y0).max()
p0[-1] += shift
result = scipy.optimize.minimize(objective, p0,
constraints={'type':'ineq',
'fun': constraints})
return result.x
Upvotes: 0