Reputation: 17694
I have 3 time series which I need to compare over time. Obviously, they need to be aligned to be comparable. Unfortunately, 2 out of 3 time series are irregular. The other 2 range from 2 to 500k observations per ID
and day.
The initial time series is available every 300ms and could be left joined with both other time series.
However I have 2 problems:
ID, time, value
, i.e. constitute each an individual time series per groupLEFT
and most fine granular to be joinable within a window of time as there might not be an exact matchsome dummy data
import pandas as pd
from datetime import datetime
import numpy as np
def make_df(frequency, valueName):
date_rng = pd.date_range(start='2018-01-01', end='2018-01-02', freq=frequency)
ts = pd.Series(np.random.randn(len(date_rng)), index=date_rng)
groups = ['a', 'b', 'c', 'd', 'e']
group_series = [groups[np.random.randint(len(groups))] for i in range(0, len(date_rng))]
df = pd.DataFrame(ts, columns=[valueName])
df['group'] = group_series
return df
df_1 = make_df('ms', 'value_A')
display(df_1.head())
df_2 = make_df('H', 'value_B')
display(df_2.head())
df_3 = make_df('S', 'value_C')
display(df_3.head())
code (all not really pythonic):
I was trying some non-equi-join similar to a JOIN b ON a.group = b.group AND time in window(some_seconds)
in SQL, but this has the problem in case there are multiple records which match i.e. not only the first but all are matched / generate a row.
Also, I trued to group data similar to(spark): df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")
but this might be quite lossy.
Then I found (pandas) Pandas aligning multiple dataframes with TimeStamp index which already looks quite interesting, however only produces exact matches. However, when trying to use df_2.join(df_3, how='outer', on=['group'], rsuffix='_1')
which also joins not only on (exact) time, but group
it fails with an error that pd.concat
is required.
After some more searching I found (pyspark) https://github.com/twosigma/flint which implements a time series join within an interval - however, I have problems using it.
Upvotes: 2
Views: 2398
Reputation: 17694
I could not find a simple way to do it in pandas - so I resorted to do this operation directly in spark.
Flint was my tool of choice. Initially, flint would not work on spark 2.2, but with my fix here: https://github.com/geoHeil/flint/commit/a2827d38e155ec8ddd4252dc62d89181f14f0c47 the following worked just fine:
val left = Seq((1,1L, 0.1), (1, 2L,0.2), (3,1L,0.3), (3, 2L,0.4)).toDF("groupA", "time", "valueA")
val right = Seq((1,1L, 11), (1, 2L,12), (3,1L,13), (3, 2L,14)).toDF("groupB", "time", "valueB")
val leftTs = TimeSeriesRDD.fromDF(dataFrame = left)(isSorted = false, timeUnit = MILLISECONDS)
val rightTS = TimeSeriesRDD.fromDF(dataFrame = right)(isSorted = false, timeUnit = MILLISECONDS)
val mergedPerGroup = leftTs.leftJoin(rightTS, tolerance = "1s")
I.e. it performs sort of a cartesian join over all the groups:
mergedPerGroup.toDF.filter(col("groupA") === col("groupB")).show
+-------+------+------+------+------+
| time|groupA|valueA|groupB|valueB|
+-------+------+------+------+------+
|1000000| 3| 0.3| 3| 13|
|2000000| 3| 0.4| 3| 14|
to remove duplicates distinct is used.
Upvotes: 1