joel
joel

Reputation: 1186

Finding anomalous values from sinusoidal data

How can I find anomalous values from following data. I am simulating a sinusoidal pattern. While I can plot the data and spot any anomalies or noise in data, but how can I do it without plotting the data. I am looking for simple approaches other than Machine learning methods.

import random 
import numpy as np 
import matplotlib.pyplot as plt 

N = 10                  # Set signal sample length
t1 = -np.pi             # Simulation begins at t1
t2 =  np.pi;            # Simulation  ends  at t2

in_array = np.linspace(t1, t2, N)
print("in_array : ", in_array)
out_array = np.sin(in_array)

plt.plot(in_array, out_array, color = 'red', marker = "o") ; plt.title("numpy.sin()")

enter image description here

Inject random noise

noise_input = random.uniform(-.5, .5); print("Noise : ",noise_input)

in_array[random.randint(0,len(in_array)-1)] = noise_input
print(in_array)

plt.plot(in_array, out_array, color = 'red', marker = "o") ; plt.title("numpy.sin()")

Data with noise

enter image description here

Upvotes: 0

Views: 1103

Answers (2)

andreihondrari
andreihondrari

Reputation: 5833

I've thought of the following approach to your problem, since you have only some values that are anomalous in the time vector, it means that the rest of the values have a regular progression, which means that if we gather all the data points in the vector under clusters and calculate the average step for the biggest cluster (which is essentially the pool of values that represent the real deal), then we can use that average to do a triad detection, in a given threshold, over the vector and detect which of the elements are anomalous.

For this we need two functions: calculate_average_step which will calculate that average for the biggest cluster of close values, and then we need detect_anomalous_values which will yield the indexes of the anomalous values in our vector, based on that average calculated earlier.

After we detected the anomalous values, we can go ahead and replace them with an estimated value, which we can determine from our average step value and by using the adjacent points in the vector.

import random 
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt 


def calculate_average_step(array, threshold=5):
    """
    Determine the average step by doing a weighted average based on clustering of averages.
    array: our array
    threshold: the +/- offset for grouping clusters. Aplicable on all elements in the array. 
    """

    # determine all the steps
    steps = []
    for i in range(0, len(array) - 1):
        steps.append(abs(array[i] - array[i+1]))

    # determine the steps clusters
    clusters = []
    skip_indexes = []
    cluster_index = 0

    for i in range(len(steps)):
        if i in skip_indexes:
            continue

        # determine the cluster band (based on threshold)
        cluster_lower = steps[i] - (steps[i]/100) * threshold
        cluster_upper = steps[i] + (steps[i]/100) * threshold

        # create the new cluster
        clusters.append([])
        clusters[cluster_index].append(steps[i])

        # try to match elements from the rest of the array
        for j in range(i + 1, len(steps)):

            if not (cluster_lower <= steps[j] <= cluster_upper):
                continue

            clusters[cluster_index].append(steps[j])
            skip_indexes.append(j)

        cluster_index += 1  # increment the cluster id

    clusters = sorted(clusters, key=lambda x: len(x), reverse=True)
    biggest_cluster = clusters[0] if len(clusters) > 0 else None

    if biggest_cluster is None:
        return None

    return sum(biggest_cluster) / len(biggest_cluster)  # return our most common average


def detect_anomalous_values(array, regular_step, threshold=5):
    """
    Will scan every triad (3 points) in the array to detect anomalies.
    array: the array to iterate over.
    regular_step: the step around which we form the upper/lower band for filtering
    treshold: +/- variation between the steps of the first and median element and median and third element.
    """
    assert(len(array) >= 3)  # must have at least 3 elements

    anomalous_indexes = []

    step_lower = regular_step - (regular_step / 100) * threshold
    step_upper = regular_step + (regular_step / 100) * threshold

    # detection will be forward from i (hence 3 elements must be available for the d)
    for i in range(0, len(array) - 2):
        a = array[i]
        b = array[i+1]
        c = array[i+2]

        first_step = abs(a-b)
        second_step = abs(b-c)

        first_belonging = step_lower <= first_step <= step_upper
        second_belonging = step_lower <= second_step <= step_upper

        # detect that both steps are alright
        if first_belonging and second_belonging:
            continue  # all is good here, nothing to do

        # detect if the first point in the triad is bad
        if not first_belonging and second_belonging:
            anomalous_indexes.append(i)

        # detect the last point in the triad is bad
        if first_belonging and not second_belonging:
            anomalous_indexes.append(i+2)

        # detect the mid point in triad is bad (or everything is bad)
        if not first_belonging and not second_belonging:
            anomalous_indexes.append(i+1)
            # we won't add here the others because they will be detected by
            # the rest of the triad scans

    return sorted(set(anomalous_indexes))  # return unique indexes

if __name__ == "__main__":

    N = 10                  # Set signal sample length
    t1 = -np.pi             # Simulation begins at t1
    t2 =  np.pi;            # Simulation  ends  at t2

    in_array = np.linspace(t1, t2, N)

    # add some noise
    noise_input = random.uniform(-.5, .5);
    in_array[random.randint(0, len(in_array)-1)] = noise_input
    noisy_out_array = np.sin(in_array)

    # display noisy sin
    plt.figure()
    plt.plot(in_array, noisy_out_array, color = 'red', marker = "o");
    plt.title("noisy numpy.sin()")

    # detect anomalous values
    average_step = calculate_average_step(in_array)
    anomalous_indexes = detect_anomalous_values(in_array, average_step)

    # replace anomalous points with an estimated value based on our calculated average
    for anomalous in anomalous_indexes:

        # try forward extrapolation
        try:
            in_array[anomalous] = in_array[anomalous-1] + average_step
        # else try backwward extrapolation
        except IndexError:
            in_array[anomalous] = in_array[anomalous+1] - average_step

    # generate sine wave
    out_array = np.sin(in_array)

    plt.figure()
    plt.plot(in_array, out_array, color = 'green', marker = "o");
    plt.title("cleaned numpy.sin()")

    plt.show()

Noisy sine:

noisy sine

Cleaned sine:

cleaned sine

Upvotes: 1

andreihondrari
andreihondrari

Reputation: 5833

Your problem relies in the time vector (which is of 1 dimension). You will need to apply some sort of filter on that vector.

First thing that came to mind was medfilt (median filter) from scipy and it looks something like this:

from scipy.signal import medfilt
l1 = [0, 10, 20, 30, 2, 50, 70, 15, 90, 100]
l2 = medfilt(l1)
print(l2)

the output of this will be:

[ 0. 10. 20. 20. 30. 50. 50. 70. 90. 90.]

the problem with this filter though is that if we apply some noise values to the edges of the vector like [200, 0, 10, 20, 30, 2, 50, 70, 15, 90, 100, -50] then the output would be something like [ 0. 10. 10. 20. 20. 30. 50. 50. 70. 90. 90. 0.] and obviously this is not ok for the sine plot since it will produce the same artifacts for the sine values array.

A better approach to this problem is to treat the time vector as an y output and it's index values as the x input and do a linear regression on the "time linear function", not the quotes, it just means we're faking the 2 dimensional model by applying a fake X vector. The code implies the use of scipy's linregress (linear regression) function:

from scipy.stats import linregress
l1 = [5, 0, 10, 20, 30, -20, 50, 70, 15, 90, 100]
l1_x = range(0, len(l1))

slope, intercept, r_val, p_val, std_err = linregress(l1_x, l1)
l1 = intercept + slope * l1_x

print(l1)

whose output will be:

[-10.45454545  -1.63636364   7.18181818  16.          24.81818182
  33.63636364  42.45454545  51.27272727  60.09090909  68.90909091
  77.72727273]

Now let's apply this to your time vector.

import random 
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt 
from scipy.stats import linregress

N = 20
# N = 10                  # Set signal sample length
t1 = -np.pi             # Simulation begins at t1
t2 =  np.pi;            # Simulation  ends  at t2

in_array = np.linspace(t1, t2, N)

# add some noise
noise_input = random.uniform(-.5, .5);
in_array[random.randint(0, len(in_array)-1)] = noise_input

# apply filter on time array
in_array_x = range(0, len(in_array))

slope, intercept, r_val, p_val, std_err = linregress(in_array_x, in_array)
in_array = intercept + slope * in_array_x

# generate sine wave
out_array = np.sin(in_array)
print("OUT ARRAY")
print(out_array)

plt.plot(in_array, out_array, color = 'red', marker = "o") ; plt.title("numpy.sin()")

plt.show()

the output will be:

linear regression on time vector for sine

the resulting signal will be an approximation of the original, as it is with any form of extrapolation/interpolation/regression filtering.

Upvotes: 0

Related Questions