Alex Michel
Alex Michel

Reputation: 436

Resample and normalize series with Deedle

I have input of raw feed from several sources that don't produce values at static rates and need to resample and normalize it for further processing. Values are resampled to 500ms using average to aggregate multiple values. Then forward fill is applied to fill missing values with last value and back fill to fill possible missing values in the beginning of data.

#raw feed
time          value     source
09:30:00.230     2         B
09:30:00.417     3         B
09:30:00.417     1         A
09:30:00.653     3         A
09:30:01.450     2         B
09:30:01.887     5         A
09:30:02.653     5         B
09:30:02.763     3         B
09:30:02.967     5         B
09:30:03.107     6         A
09:30:03.670     6         B

#resampled to 500ms intervals using average
time             A        B
09:30:00.000     NULL     2
09:30:00.500     2        3
09:30:01.000     NULL     NULL
09:30:01.500     NULL     2
09:30:02.000     5        NULL
09:30:02.500     NULL     5
09:30:03.000     6        4
09:30:03.500     NULL     6

#ffill+bfill
time             A     B
09:30:00.000     2     2
09:30:00.500     2     3
09:30:01.000     2     3
09:30:01.500     2     2
09:30:02.000     5     2
09:30:02.500     5     5
09:30:03.000     6     4
09:30:03.500     6     6

I used following code, but I doubt it is efficient way to use Deedle and resulting dataframe contains duplicate values due to full outer join, so now I need so way to aggregate them or split them to series and resample them again? Please advise if there's a better way to meet the requirements.

 private void Resample(IList<(DateTime time, double value, string source)> rawSource)
        {

            var sourceASeries = rawSource.Where(x => x.source.ToLowerInvariant() == "A").Select(x => KeyValue.Create(x.time, x.value)).ToSeries();
            var sourceBSeries = rawSource.Where(x => x.source.ToLowerInvariant() == "B").Select(x => KeyValue.Create(x.time, x.value)).ToSeries();

            var sourceAResampled = sourceASeries.ResampleUniform(dt => dt.RoundMs(500), dt => dt.RoundMs(500).AddMilliseconds(500),
                Lookup.ExactOrSmaller);
            var sourceBResampled = sourceBSeries.ResampleUniform(dt => dt.RoundMs(500), dt => dt.RoundMs(500).AddMilliseconds(500),
                Lookup.ExactOrSmaller);

            var df = Frame.FromColumns(new[] { sourceAResampled, sourceBResampled });
            df = df.FillMissing(Direction.Forward).FillMissing(Direction.Backward);
        }

In Python using Pandas it works fine for me using following code:

import Bs as pd
A_vals = vals.where(vals['Source']==' A', inplace=False).rename(columns={"Value":" A"}).drop(['Source'], axis=1)
B_vals = vals.where(vals['Source']=='B', inplace=False).rename(columns={"Value":"B"}).drop(['Source'], axis=1)
A_vals= A_vals.resample('100ms').mean().ffill().bfill()
B_vals=B_vals.resample('100ms').mean().ffill().bfill()
result=pd.concat([ A_vals,B_vals], axis=1)

Upvotes: 1

Views: 442

Answers (1)

Alex Michel
Alex Michel

Reputation: 436

I managed to get correct results using following code, though I'm sure it can be optimized performance wise:

 private IList<(int rownum, DateTime time, double A, double B)> ResampleAndNormalize(IList<(DateTime time, double value, string source)> rawTicks, int interval = 100)
        {
            var uniqueTicks = rawTicks.GroupBy(x => (time: x.time, source: x.source), x => x,
                (k, ticks) => (time: k.time, value: ticks.Average(x => x.value), source: k.source)).ToList();

            var ASeries = uniqueTicks.Where(x => x.source.ToLowerInvariant() == "A").Select(x => KeyValue.Create(x.time, x.value)).ToSeries();
            var BSeries = uniqueTicks.Where(x => x.source.ToLowerInvariant() == "B").Select(x => KeyValue.Create(x.time, x.value)).ToSeries();

            var startTime = ASeries.FirstKey().MinTime(BSeries.FirstKey()).RoundMs(interval);
            var endTime = ASeries.LastKey().MaxTime(BSeries.LastKey()).RoundMs(interval);
            var newKeys = Enumerable.Range(0, (int)Math.Ceiling(endTime.Subtract(startTime).TotalMilliseconds / interval))
                .Select(x => startTime.AddMilliseconds(x * interval)).ToList();

            var AResampled = ASeries.ResampleEquivalence(x => x.RoundMs(interval), x => x.Mean());
            var BResampled = BSeries.ResampleEquivalence(x => x.RoundMs(interval), x => x.Mean());

            AResampled = AResampled.Realign(newKeys).FillMissing(Direction.Forward).FillMissing(Direction.Backward);
            BResampled = BResampled.Realign(newKeys).FillMissing(Direction.Forward).FillMissing(Direction.Backward);

            var results = new List<(int rownum, DateTime time, double A, double B)>();
            for (int i = 0; i < newKeys.Count; i++)
            {
                var time = newKeys[i];
                var Avalue = AResampled.GetAt(i);
                var Bvalue = BResampled.GetAt(i);
                results.Add((rownum: 0, time: time, A: Avalue, B: Bvalue));
            }

            return results;
        }

 public static class DateTimeExtensions
    {
        public static DateTime RoundMs(this DateTime time, int precision)
        {
            var ticksPrecision = precision * TimeSpan.TicksPerMillisecond;
            var ticksRemainder = time.Ticks % ticksPrecision;
            if (ticksRemainder >= ticksPrecision / 2)
                ticksRemainder = ticksPrecision - ticksRemainder;
            else
                ticksRemainder = -ticksRemainder;
            return time.AddTicks(ticksRemainder);
        }

        public static DateTime MinTime(this DateTime a, DateTime b)
        {
            return a >= b ? b : a;
        }
        public static DateTime MaxTime(this DateTime a, DateTime b)
        {
            return a < b ? b : a;
        }
    }

Upvotes: 0

Related Questions