Reputation: 436
I have input of raw feed from several sources that don't produce values at static rates and need to resample and normalize it for further processing. Values are resampled to 500ms using average to aggregate multiple values. Then forward fill is applied to fill missing values with last value and back fill to fill possible missing values in the beginning of data.
#raw feed
time value source
09:30:00.230 2 B
09:30:00.417 3 B
09:30:00.417 1 A
09:30:00.653 3 A
09:30:01.450 2 B
09:30:01.887 5 A
09:30:02.653 5 B
09:30:02.763 3 B
09:30:02.967 5 B
09:30:03.107 6 A
09:30:03.670 6 B
#resampled to 500ms intervals using average
time A B
09:30:00.000 NULL 2
09:30:00.500 2 3
09:30:01.000 NULL NULL
09:30:01.500 NULL 2
09:30:02.000 5 NULL
09:30:02.500 NULL 5
09:30:03.000 6 4
09:30:03.500 NULL 6
#ffill+bfill
time A B
09:30:00.000 2 2
09:30:00.500 2 3
09:30:01.000 2 3
09:30:01.500 2 2
09:30:02.000 5 2
09:30:02.500 5 5
09:30:03.000 6 4
09:30:03.500 6 6
I used following code, but I doubt it is efficient way to use Deedle and resulting dataframe contains duplicate values due to full outer join, so now I need so way to aggregate them or split them to series and resample them again? Please advise if there's a better way to meet the requirements.
private void Resample(IList<(DateTime time, double value, string source)> rawSource)
{
var sourceASeries = rawSource.Where(x => x.source.ToLowerInvariant() == "A").Select(x => KeyValue.Create(x.time, x.value)).ToSeries();
var sourceBSeries = rawSource.Where(x => x.source.ToLowerInvariant() == "B").Select(x => KeyValue.Create(x.time, x.value)).ToSeries();
var sourceAResampled = sourceASeries.ResampleUniform(dt => dt.RoundMs(500), dt => dt.RoundMs(500).AddMilliseconds(500),
Lookup.ExactOrSmaller);
var sourceBResampled = sourceBSeries.ResampleUniform(dt => dt.RoundMs(500), dt => dt.RoundMs(500).AddMilliseconds(500),
Lookup.ExactOrSmaller);
var df = Frame.FromColumns(new[] { sourceAResampled, sourceBResampled });
df = df.FillMissing(Direction.Forward).FillMissing(Direction.Backward);
}
In Python using Pandas it works fine for me using following code:
import Bs as pd
A_vals = vals.where(vals['Source']==' A', inplace=False).rename(columns={"Value":" A"}).drop(['Source'], axis=1)
B_vals = vals.where(vals['Source']=='B', inplace=False).rename(columns={"Value":"B"}).drop(['Source'], axis=1)
A_vals= A_vals.resample('100ms').mean().ffill().bfill()
B_vals=B_vals.resample('100ms').mean().ffill().bfill()
result=pd.concat([ A_vals,B_vals], axis=1)
Upvotes: 1
Views: 442
Reputation: 436
I managed to get correct results using following code, though I'm sure it can be optimized performance wise:
private IList<(int rownum, DateTime time, double A, double B)> ResampleAndNormalize(IList<(DateTime time, double value, string source)> rawTicks, int interval = 100)
{
var uniqueTicks = rawTicks.GroupBy(x => (time: x.time, source: x.source), x => x,
(k, ticks) => (time: k.time, value: ticks.Average(x => x.value), source: k.source)).ToList();
var ASeries = uniqueTicks.Where(x => x.source.ToLowerInvariant() == "A").Select(x => KeyValue.Create(x.time, x.value)).ToSeries();
var BSeries = uniqueTicks.Where(x => x.source.ToLowerInvariant() == "B").Select(x => KeyValue.Create(x.time, x.value)).ToSeries();
var startTime = ASeries.FirstKey().MinTime(BSeries.FirstKey()).RoundMs(interval);
var endTime = ASeries.LastKey().MaxTime(BSeries.LastKey()).RoundMs(interval);
var newKeys = Enumerable.Range(0, (int)Math.Ceiling(endTime.Subtract(startTime).TotalMilliseconds / interval))
.Select(x => startTime.AddMilliseconds(x * interval)).ToList();
var AResampled = ASeries.ResampleEquivalence(x => x.RoundMs(interval), x => x.Mean());
var BResampled = BSeries.ResampleEquivalence(x => x.RoundMs(interval), x => x.Mean());
AResampled = AResampled.Realign(newKeys).FillMissing(Direction.Forward).FillMissing(Direction.Backward);
BResampled = BResampled.Realign(newKeys).FillMissing(Direction.Forward).FillMissing(Direction.Backward);
var results = new List<(int rownum, DateTime time, double A, double B)>();
for (int i = 0; i < newKeys.Count; i++)
{
var time = newKeys[i];
var Avalue = AResampled.GetAt(i);
var Bvalue = BResampled.GetAt(i);
results.Add((rownum: 0, time: time, A: Avalue, B: Bvalue));
}
return results;
}
public static class DateTimeExtensions
{
public static DateTime RoundMs(this DateTime time, int precision)
{
var ticksPrecision = precision * TimeSpan.TicksPerMillisecond;
var ticksRemainder = time.Ticks % ticksPrecision;
if (ticksRemainder >= ticksPrecision / 2)
ticksRemainder = ticksPrecision - ticksRemainder;
else
ticksRemainder = -ticksRemainder;
return time.AddTicks(ticksRemainder);
}
public static DateTime MinTime(this DateTime a, DateTime b)
{
return a >= b ? b : a;
}
public static DateTime MaxTime(this DateTime a, DateTime b)
{
return a < b ? b : a;
}
}
Upvotes: 0