danicbzs
danicbzs

Reputation: 11

Real-time peak detection in time series

I am currently working on a project that consists in finding real-time accurate peaks from a random given signal. As you can see, I am using the find_peaks_cwt function for this task, but it does not give me accurate results plus it lags too much and repaints. What should I do in order to improve this? PD: Is there any way to find real-time valleys also? Here is the code:

import random
import matplotlib.pyplot as plt
from itertools import count
from matplotlib.animation import FuncAnimation
from scipy.signal import find_peaks_cwt
import numpy as np

def random_timeseries(initial_value: float, volatility: float, count: int) -> list:
    time_series = [initial_value, ]
    for _ in range(count):
        time_series.append(time_series[-1] + initial_value * random.gauss(0, 1) * volatility)
    return time_series

ts = random_timeseries(1.2, 0.15, 1000)

x_data = []
y_data = []

index = count()

def animate(i):
    current_index = next(index)
    x_data.append(current_index)
    y_data.append(ts[current_index])
    peaks = find_peaks_cwt(y_data, np.arange(1,10))
    plt.cla()
    plt.plot(x_data, y_data)
    plt.plot(peaks, [y_data[i] for i in peaks], "x")
    plt.title("Random Time Series with Peaks")
    plt.xlabel("Time")
    plt.ylabel("Value")

ani = FuncAnimation(plt.gcf(), animate, interval=100)

plt.tight_layout()
plt.show()

I've tried to improve this by using a ricker wavelet in order to filter the signal before applying the find_peaks_cwt function, but haven't got better results.

Upvotes: 1

Views: 616

Answers (1)

miriam mazzeo
miriam mazzeo

Reputation: 403

I have used function find_peaks function and it works much better:

import random
import matplotlib.pyplot as plt
from itertools import count
from matplotlib.animation import FuncAnimation
from scipy.signal import find_peaks_cwt, find_peaks
import numpy as np

def random_timeseries(initial_value: float, volatility: float, count: int) -> list:
    time_series = [initial_value, ]
    for _ in range(count):
        time_series.append(time_series[-1] + initial_value * random.gauss(0, 1) * volatility)
    return time_series

ts = random_timeseries(1.2, 0.15, 1000)

x_data = []
y_data = []

index = count()

def animate(i):
    current_index = next(index)
    x_data.append(current_index)
    y_data.append(ts[current_index])
    peaks, heights = find_peaks(y_data, height=0)
    plt.cla()
    plt.plot(x_data, y_data)
    plt.plot(peaks, heights['peak_heights'], "x")
    plt.title("Random Time Series with Peaks")
    plt.xlabel("Time")
    plt.ylabel("Value")

ani = FuncAnimation(plt.gcf(), animate, interval=100)

plt.tight_layout()
plt.show()
ani.save('anim.png')

You can change Height parameter with height=(None, None) to select peaks below zero as well.

enter image description here

Upvotes: 1

Related Questions