Reputation: 1105
I have a application that recieves incoming signals with a fixed frequency. I have been trying to implement a filter for this incoming signal without having to save N timesteps and performing a filtering-function on that. What I would like to do is akin to a one-dimensional Kalman filter where I update my current state with the new observation. I am no mathematician, and thus the Wikipedia pages regarding if this is possible is completely impenetrable to me. Furthermore, the StackOverflow answers (I've found) in this domain only regards if you have a section of N timesteps of signal available and how to perform a filtering on that, and such filtering I can do without a problem.
I've provided some dummy code below that illustrates the types of functions I have been trying to write.
import numpy as np
import matplotlib.pyplot as plt
def high_pass(new, previous, cutoff=20):
# Howto?
return 0
def low_pass(new, previous, cutofff=20):
# Howto?
return 0
def continuous_filter(vs):
prev_highpass, prev_lowpass = 0, 0
for v in vs:
prev_highpass = high_pass(v, prev_highpass)
prev_lowpass = low_pass(v, prev_lowpass)
yield prev_highpass, prev_lowpass
np.random.seed(21)
sec_duration = 10.0
time_resolution = 1e3
dt = 1/time_resolution
steps = int(sec_duration * time_resolution)
signal = np.sum([np.sin(np.linspace(0, np.random.randint(10, 100), steps)) * np.random.random() for _ in range(3)], axis=0)
filt_signals = np.array([[high, low] for high, low in continuous_filter(signal)])
plt.plot(signal, label="Input signal")
plt.plot(filt_signals[:, 0], label="High-pass")
plt.plot(filt_signals[:, 1], label="Low pass")
plt.legend()
plt.show()
Anyone that can tell me if it is possible? I have been looking at the SciPy documentation but I dont understand it.
Upvotes: 0
Views: 4914
Reputation: 21
This is a good answer Dorian, thank you. After years of shadowing stack overflow, this will be my first contribution. It is minor, hope it helps extend the use cases. Tried to keep the original notation as much as possible so it is traceable.
Only two imports are needed
'''python
import numpy as np
import matplotlib.pyplot as plt
Minor correction to the rc_high_pass alpha value
def rc_low_pass(x_new, y_old, sample_rate_hz, lowpass_cutoff_hz):
dt = 1/sample_rate_hz
rc = 1/(2*np.pi*lowpass_cutoff_hz)
alpha = dt/(rc + dt)
y_new = x_new * alpha + (1 - alpha) * y_old
return y_new
def rc_high_pass(x_new, x_old, y_old, sample_rate_hz, highpass_cutoff_hz):
dt = 1/sample_rate_hz
rc = 1/(2*np.pi*highpass_cutoff_hz)
alpha = rc/(rc + dt)
y_new = alpha * (y_old + x_new - x_old)
return y_new
def rc_filters(xs, sample_rate_hz,
highpass_cutoff_hz,
lowpass_cutoff_hz):
# Initialize. This can be improved to match wikipedia.
x_prev = 0
y_prev_high = 0
y_prev_low = 0
for x in xs:
y_prev_high = rc_high_pass(x, x_prev, y_prev_high, sample_rate_hz,
highpass_cutoff_hz)
y_prev_low = rc_low_pass(x, y_prev_low, sample_rate_hz,
lowpass_cutoff_hz)
x_prev = x
yield y_prev_high, y_prev_low
And now to the main, where I use physical units in hertz and seconds. Kindly excuse the base 2 number representation, it helps set the stage for ffts.
if __name__ == "__main__":
"""
# RC filters for continuous signals
"""
sample_rate = 2**13 # Close to 8 kHz
duration_points = 2**10
sec_duration = duration_points/sample_rate
frequency_low = sample_rate/2**9
frequency_high = sample_rate/2**3
# Design the cutoff
number_octaves = 3
highpass_cutoff = frequency_high/2**number_octaves
lowpass_cutoff = frequency_low*2**number_octaves
print('Two-tone test')
print('Sample rate, Hz:', sample_rate)
print('Record duration, s:', sec_duration)
print('Low, high tone frequency:', frequency_low, frequency_high)
time_s = np.arange(duration_points)/sample_rate
sig = np.sin(2*np.pi*frequency_low*time_s) + \
np.sin(2*np.pi*frequency_high*time_s)
filt_signals = np.array([[high, low]
for high, low in
rc_filters(sig, sample_rate,
highpass_cutoff, lowpass_cutoff)])
The input signal is different and closer to my typical use case, and the resulting plot shows that the usual RC filter is not very fast. It is a classic solution, and it is good to have it here.
plt.plot(sig, label="Input signal")
plt.plot(filt_signals[:, 0], label="High-pass")
plt.plot(filt_signals[:, 1], label="Low-pass")
plt.title("RC Low-pass and High-pass Filter Response")
plt.legend()
plt.show()
Two-tone input, 6 octave separation; substantial spectral leakage.
Upvotes: 2
Reputation: 1488
First, you need a time constant for your filter from the cutoff: alpha = dt / (RC + dt)
and cutoff = 1 / (2 * pi * RC)
.
You need this factor to compute the next filtered value:
def low_pass(x_new, y_old, cutoff=20):
alpha = dt / (dt + 1 / (2 * np.pi * cutoff))
y_new = x_new * alpha + (1 - alpha) * y_old
return y_new
From wikipedia: low-pass.
def high_pass(x_new, x_old, y_old, cutoff=20):
alpha = dt / (dt + 1 / (2 * np.pi * cutoff))
y_new = alpha * (y_old + x_new - x_old)
return y_new
From wikipedia: high-pass.
def continuous_filter(xs):
prev_highpass, prev_lowpass = 0, 0
x_prev = 0 # need initializatoin for highpass
y_prev_high = 0 # initialization
y_prev_low = 0 # initialization
for x in xs:
y_prev_high = high_pass(x, x_prev, y_prev_high)
y_prev_low = low_pass(x, y_prev_low)
x_prev = x
yield y_prev_high, y_prev_low
np.random.seed(21)
sec_duration = 10.0
time_resolution = 1e3
dt = 1/time_resolution
steps = int(sec_duration * time_resolution)
signal = np.sum([np.sin(np.linspace(0, np.random.randint(10, 100), steps)) * np.random.random() for _ in range(3)], axis=0)
filt_signals = np.array([[high, low] for high, low in continuous_filter(signal)])
Upvotes: 2