Georg Heiler
Georg Heiler

Reputation: 17694

pandas align irregular time series with different frequencies

I have 3 time series which I need to compare over time. Obviously, they need to be aligned to be comparable. Unfortunately, 2 out of 3 time series are irregular. The other 2 range from 2 to 500k observations per ID and day.

The initial time series is available every 300ms and could be left joined with both other time series.

However I have 2 problems:

  1. these 3 series I was introducing above are in the format of ID, time, value, i.e. constitute each an individual time series per group
  2. formulating the join condition i.e. assuming the LEFT and most fine granular to be joinable within a window of time as there might not be an exact match

edit

some dummy data

import pandas as pd
from datetime import datetime
import numpy as np
def make_df(frequency, valueName):
    date_rng = pd.date_range(start='2018-01-01', end='2018-01-02', freq=frequency)
    ts = pd.Series(np.random.randn(len(date_rng)), index=date_rng)
    groups = ['a', 'b', 'c', 'd', 'e']
    group_series = [groups[np.random.randint(len(groups))] for i in range(0, len(date_rng))]
    df = pd.DataFrame(ts, columns=[valueName])
    df['group'] = group_series
    return df
df_1 = make_df('ms', 'value_A')
display(df_1.head())
df_2 = make_df('H', 'value_B')
display(df_2.head())
df_3 = make_df('S', 'value_C')
display(df_3.head())

code (all not really pythonic): I was trying some non-equi-join similar to a JOIN b ON a.group = b.group AND time in window(some_seconds) in SQL, but this has the problem in case there are multiple records which match i.e. not only the first but all are matched / generate a row.

Also, I trued to group data similar to(spark): df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric") but this might be quite lossy.

Then I found (pandas) Pandas aligning multiple dataframes with TimeStamp index which already looks quite interesting, however only produces exact matches. However, when trying to use df_2.join(df_3, how='outer', on=['group'], rsuffix='_1') which also joins not only on (exact) time, but group it fails with an error that pd.concat is required.

After some more searching I found (pyspark) https://github.com/twosigma/flint which implements a time series join within an interval - however, I have problems using it.

Upvotes: 2

Views: 2398

Answers (1)

Georg Heiler
Georg Heiler

Reputation: 17694

I could not find a simple way to do it in pandas - so I resorted to do this operation directly in spark.

Flint was my tool of choice. Initially, flint would not work on spark 2.2, but with my fix here: https://github.com/geoHeil/flint/commit/a2827d38e155ec8ddd4252dc62d89181f14f0c47 the following worked just fine:

val left = Seq((1,1L, 0.1), (1, 2L,0.2), (3,1L,0.3), (3, 2L,0.4)).toDF("groupA", "time", "valueA")
  val right = Seq((1,1L, 11), (1, 2L,12), (3,1L,13), (3, 2L,14)).toDF("groupB", "time", "valueB")
  val leftTs = TimeSeriesRDD.fromDF(dataFrame = left)(isSorted = false, timeUnit = MILLISECONDS)
  val rightTS        = TimeSeriesRDD.fromDF(dataFrame = right)(isSorted = false, timeUnit = MILLISECONDS)

  val mergedPerGroup = leftTs.leftJoin(rightTS, tolerance = "1s")

I.e. it performs sort of a cartesian join over all the groups:

mergedPerGroup.toDF.filter(col("groupA") === col("groupB")).show
+-------+------+------+------+------+
|   time|groupA|valueA|groupB|valueB|
+-------+------+------+------+------+
|1000000|     3|   0.3|     3|    13|
|2000000|     3|   0.4|     3|    14|

to remove duplicates distinct is used.

Upvotes: 1

Related Questions