FObersteiner
FObersteiner

Reputation: 25564

Interpolate time series data from one df to time axis of another df in Python polars

I have time series data on different time axes in different dataframes. I need to interpolate data from one df to onto the time axis of another df, df_ref. Ex:

import polars as pl

# DataFrame with the reference time axis:
df_ref = pl.DataFrame({"dt": ["2022-12-14T14:00:01.000", "2022-12-14T14:00:02.000",
                              "2022-12-14T14:00:03.000", "2022-12-14T14:00:04.000",
                              "2022-12-14T14:00:05.000", "2022-12-14T14:00:06.000"]})
df_ref = df_ref.with_columns(pl.col("dt").str.to_datetime())

# DataFrame with a different frequency time axis, to be interpolated onto the reference time axis:
df = pl.DataFrame({
        "dt": ["2022-12-14T14:00:01.500", "2022-12-14T14:00:03.500", "2022-12-14T14:00:05.500"],
        "v1": [1.5, 3.5, 5.5]})
df = df.with_columns(pl.col("dt").str.to_datetime())

I cannot join the dfs since keys don't match:

print(df_ref.join(df, on="dt", how="left").interpolate())
shape: (6, 2)
┌─────────────────────┬──────┐
│ dt                  ┆ v1   │
│ ---                 ┆ ---  │
│ datetime[μs]        ┆ f64  │
╞═════════════════════╪══════╡
│ 2022-12-14 14:00:01 ┆ null │
│ 2022-12-14 14:00:02 ┆ null │
│ 2022-12-14 14:00:03 ┆ null │
│ 2022-12-14 14:00:04 ┆ null │
│ 2022-12-14 14:00:05 ┆ null │
│ 2022-12-14 14:00:06 ┆ null │
└─────────────────────┴──────┘

So my 'iterative' approach would be to interpolate each column individually, for instance like

from scipy.interpolate import interp1d

f = interp1d(df["dt"].dt.timestamp(), df["v1"],
             kind="linear", bounds_error=False, fill_value="extrapolate")

out = f(df_ref["dt"].dt.timestamp())
df_ref = df_ref.with_columns(pl.Series(out).alias("v1_interp"))

print(df_ref.head(6))
shape: (6, 2)
┌─────────────────────┬───────────┐
│ dt                  ┆ v1_interp │
│ ---                 ┆ ---       │
│ datetime[μs]        ┆ f64       │
╞═════════════════════╪═══════════╡
│ 2022-12-14 14:00:01 ┆ 1.0       │
│ 2022-12-14 14:00:02 ┆ 2.0       │
│ 2022-12-14 14:00:03 ┆ 3.0       │
│ 2022-12-14 14:00:04 ┆ 4.0       │
│ 2022-12-14 14:00:05 ┆ 5.0       │
│ 2022-12-14 14:00:06 ┆ 6.0       │
└─────────────────────┴───────────┘

Although this gives the result I need, I wonder if there is a more idiomatic approach? I hesitate to mention efficiency here since I haven't benchmarked this with real data yet ("measure, don't guess!"). However, I'd assume that a native implementation in the underlying Rust code could add some performance benefits.

Upvotes: 5

Views: 1430

Answers (2)

jqurious
jqurious

Reputation: 21319

The scipy.interpolate.interpol1d example ends up calling this function.

You could use the same approach and process each column with .map()

def polars_ip(df_ref, df):
   old = df["dt"].dt.timestamp().to_numpy()
   new = df_ref["dt"].dt.timestamp().to_numpy()

   hi = np.searchsorted(old, new).clip(1, len(old) - 1)
   lo = hi - 1

   def _interp(column):
      column = column.to_numpy()
      slope = (column[hi] - column[lo]) / (old[hi] - old[lo])
      return pl.Series(slope * (new - old[lo]) + column[lo])
      
   values = (
      pl.concat([df, df_ref], how="diagonal")
        .select(pl.exclude("dt").map(_interp))
   )
   values.columns = [f"{name}_ref_ip" for name in values.columns]
   
   return df_ref.hstack(values)  
>>> %time polars_ip(df_ref, df)
CPU times: user 48.1 ms, sys: 20.4 ms, total: 68.5 ms
Wall time: 22 ms
shape: (85536, 11)
>>> %time scipy_ip(df_ref, df)
CPU times: user 75.5 ms, sys: 5.51 ms, total: 81 ms
Wall time: 74.3 ms
shape: (85536, 11)

Check they return the same values:

>>> polars_ip(df_ref, df).frame_equal(scipy_ip(df_ref, df))
True

You can also generate the same values using:

N_COLS = 10
names = list(map(str, range(N_COLS)))
has_reading = pl.col(names[0]).is_not_null()
has_no_reading = has_reading.is_not()
(
   pl.concat([df, df_ref], how="diagonal")
   .sort("dt")
   .with_columns([
      pl.when(has_reading).then(pl.all())
        .shift(-1).backward_fill().suffix("_hi"),
      pl.when(has_reading).then(pl.all())
        .shift(+1).forward_fill().suffix("_lo")
      ])
   .with_columns([
      pl.when(has_reading).then(pl.col(r"^.+_hi$"))
        .forward_fill().backward_fill(),
      pl.when(has_reading).then(pl.col(r"^.+_lo$"))
        .backward_fill().forward_fill()
      ])
   .filter(has_no_reading)
   .with_column(
      pl.col(r"^dt.*$").dt.timestamp().suffix("_ts"))
   .with_columns([
      (((pl.col(f"{name}_hi")  - pl.col(f"{name}_lo")) 
         / (pl.col("dt_hi_ts") - pl.col("dt_lo_ts")))
         * (pl.col("dt_ts")    - pl.col("dt_lo_ts")) 
         + pl.col(f"{name}_lo"))
         .alias(f"{name}_ref_ip") for name in names
      ])
   .select([
      pl.col("dt"), pl.col("^.+_ref_ip$")
   ])
)  

Upvotes: 4

Dean MacGregor
Dean MacGregor

Reputation: 18466

First approach

I think this is one of those problems where you want to create a ton more rows than you actually want to end up and then filter back down to what you want.

Because polars's interpolate function only computes missing values in between known values rather than extrapolating forward and backwards, let's make our first step to manually extrapolate df1 to add an extra row before and after.

df1=df1.lazy()
df1=pl.concat([df1,
        df1.sort('dt').with_row_count('n') \
            .select(
                    [pl.col('n')] + \
                    [pl.when(pl.col('n')<=1) \
                       .then(pl.col(x)-(pl.col(x).shift(-1)-pl.col(x))) \
                    .when(pl.col('n')>=pl.col('n').max()-1) \
                       .then(pl.col(x)+(pl.col(x)-pl.col(x).shift(1)))
                               for x in df1.columns]
                ) \
        .filter((pl.col('n')==0) | (pl.col('n')==pl.col('n').max())) \
        .select(pl.exclude('n'))]).sort('dt')

I'm using a list comprehension in the select so this should be extensible to any number of columns.

The next thing to do is make a df with a dt column that starts with the earliest dt and ends with the latest between df0 and df1 with the minimum time difference. By fixing the difference in your key column, it allows polars's interpolate to work as you expect.

specs = pl.concat([df0.select('dt'),df1.select('dt')]) \
          .sort('dt').select([
                       pl.col('dt').min().alias('mindt'),
                       pl.col('dt').max().alias('maxdt'), 
                       (pl.col('dt')-pl.col('dt').shift()).min().alias('mindiff')
                      ]).collect()


newdf = pl.DataFrame({'dt':pl.date_range(specs[0,0], specs[0,1], specs[0,2])}).lazy()

Alternatively you can make newdf with a list comprehension incase dt isn't a datetime pl.DataFrame({'dt': [specs[0,0] + specs[0,2]*x for x in range(int(1+(specs[0,1]-specs[0,0])/specs[0,2]))]}).lazy()

With that you do an outer join between that and your two dfs then use the embedded interpolate to get all the values you're looking for. You can chain a filter and select at the end to match your output.

newdf = newdf.join(df0, on='dt', how='outer') \
    .join(df1, on='dt', how='outer') \
    .with_columns([pl.col(x).interpolate().suffix('_interp') for x in df1.columns if x != 'dt']) \
    .filter(~pl.col('v0').is_null()).select(pl.exclude('v1')) \
    .collect()

Second approach

Another way to tackle the problem is to essentially recreate the scipy interpolate function with a bunch of shift and whenthen statements...

First you do a diagonal concat and then add a bunch of helper columns representing the dt and v1 columns but shifted, one pair for a forward shift and another pair for backwards. Then calculate the change in v1 by the time difference which is then itself carried forwards and backwards. Almost lastly is whenthen logic for begining/ending/middle rows. True lastly, is filtering and selecting away the helper columns.

    pl.concat([df0, df1], how='diagonal').sort('dt') \
    .with_column(pl.when(~pl.col('v1').is_null()).then(pl.col('dt')).alias('v1dt')) \
    .with_columns([
        pl.col('v1').fill_null(strategy='forward').alias('v1_for'),
        pl.col('v1dt').fill_null(strategy='forward').alias('v1dt_for'),
        pl.col('v1').fill_null(strategy='backward').alias('v1_back'),
        pl.col('v1dt').fill_null(strategy='backward').alias('v1dt_back')
        ]) \
    .with_column(((pl.col('v1_back')-pl.col('v1_for'))/(pl.col('v1dt_back')-pl.col('v1dt_for'))).alias('diff')) \
    .with_column((pl.when(pl.col('diff').is_nan()).then(None).otherwise(pl.col('diff'))).alias('diff')) \
    .with_column(pl.col('diff').fill_null(strategy='forward').fill_null(strategy='backward')) \
    .with_column((pl.when(~pl.col('v1').is_null()).then(pl.col('v1')) \
                .when((~pl.col('v1_for').is_null()) & (~pl.col('v1_back').is_null())) \
                        .then((pl.col('dt')-pl.col('v1dt_for'))*pl.col('diff')+pl.col('v1_for')) \
                .when(~pl.col('v1_back').is_null()) \
                        .then(pl.col('v1_back')-(pl.col('v1dt_back')-pl.col('dt'))*pl.col('diff')) \
                .otherwise(pl.col('v1_for')+(pl.col('dt')-pl.col('v1dt_for'))*pl.col('diff'))).alias('v1_interp')) \
    .filter(~pl.col('v0').is_null()).select(['dt','v0','v1_interp'])

Upvotes: 1

Related Questions