JMDE
JMDE

Reputation: 1105

How to implement continuous time high/low pass filter in python?

I have a application that recieves incoming signals with a fixed frequency. I have been trying to implement a filter for this incoming signal without having to save N timesteps and performing a filtering-function on that. What I would like to do is akin to a one-dimensional Kalman filter where I update my current state with the new observation. I am no mathematician, and thus the Wikipedia pages regarding if this is possible is completely impenetrable to me. Furthermore, the StackOverflow answers (I've found) in this domain only regards if you have a section of N timesteps of signal available and how to perform a filtering on that, and such filtering I can do without a problem.

I've provided some dummy code below that illustrates the types of functions I have been trying to write.

import numpy as np
import matplotlib.pyplot as plt

def high_pass(new, previous, cutoff=20):
    # Howto?
    return 0

def low_pass(new, previous, cutofff=20):
    # Howto?
    return 0

def continuous_filter(vs):
    prev_highpass, prev_lowpass = 0, 0
    for v in vs:
        prev_highpass = high_pass(v, prev_highpass)
        prev_lowpass = low_pass(v, prev_lowpass)
        yield prev_highpass, prev_lowpass


np.random.seed(21)
sec_duration = 10.0
time_resolution = 1e3
dt = 1/time_resolution
steps = int(sec_duration * time_resolution)
signal = np.sum([np.sin(np.linspace(0, np.random.randint(10, 100), steps)) * np.random.random() for _ in range(3)], axis=0)

filt_signals = np.array([[high, low] for high, low in continuous_filter(signal)])

plt.plot(signal, label="Input signal")
plt.plot(filt_signals[:, 0], label="High-pass")
plt.plot(filt_signals[:, 1], label="Low pass")
plt.legend()
plt.show()

enter image description here

Anyone that can tell me if it is possible? I have been looking at the SciPy documentation but I dont understand it.

Upvotes: 0

Views: 4914

Answers (2)

slipstream
slipstream

Reputation: 21

This is a good answer Dorian, thank you. After years of shadowing stack overflow, this will be my first contribution. It is minor, hope it helps extend the use cases. Tried to keep the original notation as much as possible so it is traceable.

Only two imports are needed

'''python
import numpy as np
import matplotlib.pyplot as plt

Minor correction to the rc_high_pass alpha value

def rc_low_pass(x_new, y_old, sample_rate_hz, lowpass_cutoff_hz):
    dt = 1/sample_rate_hz
    rc = 1/(2*np.pi*lowpass_cutoff_hz)
    alpha = dt/(rc + dt)
    y_new = x_new * alpha + (1 - alpha) * y_old
    return y_new


def rc_high_pass(x_new, x_old, y_old, sample_rate_hz, highpass_cutoff_hz):
    dt = 1/sample_rate_hz
    rc = 1/(2*np.pi*highpass_cutoff_hz)
    alpha = rc/(rc + dt)
    y_new = alpha * (y_old + x_new - x_old)
    return y_new


def rc_filters(xs, sample_rate_hz, 
               highpass_cutoff_hz, 
               lowpass_cutoff_hz):
    # Initialize. This can be improved to match wikipedia.
    x_prev = 0
    y_prev_high = 0
    y_prev_low = 0

    for x in xs:
        y_prev_high = rc_high_pass(x, x_prev, y_prev_high, sample_rate_hz, 
                                   highpass_cutoff_hz)
        y_prev_low = rc_low_pass(x, y_prev_low, sample_rate_hz, 
                                 lowpass_cutoff_hz)
        x_prev = x
        yield y_prev_high, y_prev_low

And now to the main, where I use physical units in hertz and seconds. Kindly excuse the base 2 number representation, it helps set the stage for ffts.

if __name__ == "__main__":
    """
    # RC filters for continuous signals
    """
    sample_rate = 2**13  # Close to 8 kHz
    duration_points = 2**10
    sec_duration = duration_points/sample_rate

    frequency_low = sample_rate/2**9
    frequency_high = sample_rate/2**3

    # Design the cutoff
    number_octaves = 3
    highpass_cutoff = frequency_high/2**number_octaves
    lowpass_cutoff = frequency_low*2**number_octaves

    print('Two-tone test')
    print('Sample rate, Hz:', sample_rate)
    print('Record duration, s:', sec_duration)
    print('Low, high tone frequency:', frequency_low, frequency_high)

    time_s = np.arange(duration_points)/sample_rate

    sig = np.sin(2*np.pi*frequency_low*time_s) + \
          np.sin(2*np.pi*frequency_high*time_s)

    filt_signals = np.array([[high, low]
                             for high, low in
                             rc_filters(sig, sample_rate,
                                        highpass_cutoff, lowpass_cutoff)])

The input signal is different and closer to my typical use case, and the resulting plot shows that the usual RC filter is not very fast. It is a classic solution, and it is good to have it here.

plt.plot(sig, label="Input signal")
plt.plot(filt_signals[:, 0], label="High-pass")
plt.plot(filt_signals[:, 1], label="Low-pass")
plt.title("RC Low-pass and High-pass Filter Response")
plt.legend()
plt.show()

Two-tone input, 6 octave separation; substantial spectral leakage.

Upvotes: 2

Dorian
Dorian

Reputation: 1488

First, you need a time constant for your filter from the cutoff: alpha = dt / (RC + dt) and cutoff = 1 / (2 * pi * RC).

You need this factor to compute the next filtered value:

def low_pass(x_new, y_old, cutoff=20):
    alpha = dt / (dt + 1 / (2 * np.pi * cutoff))
    y_new = x_new * alpha + (1 - alpha) * y_old
    return y_new

From wikipedia: low-pass.

def high_pass(x_new, x_old, y_old, cutoff=20):
    alpha = dt / (dt + 1 / (2 * np.pi * cutoff))
    y_new = alpha * (y_old + x_new - x_old)
    return y_new 

From wikipedia: high-pass.

def continuous_filter(xs):
    prev_highpass, prev_lowpass = 0, 0
    x_prev = 0  # need initializatoin for highpass
    y_prev_high = 0  # initialization
    y_prev_low = 0  # initialization

    for x in xs:
        y_prev_high = high_pass(x, x_prev, y_prev_high)
        y_prev_low = low_pass(x, y_prev_low)
        x_prev = x
        yield y_prev_high, y_prev_low


np.random.seed(21)
sec_duration = 10.0
time_resolution = 1e3
dt = 1/time_resolution

steps = int(sec_duration * time_resolution)
signal = np.sum([np.sin(np.linspace(0, np.random.randint(10, 100), steps)) * np.random.random() for _ in range(3)], axis=0)

filt_signals = np.array([[high, low] for high, low in continuous_filter(signal)])


Upvotes: 2

Related Questions