Ehrendil
Ehrendil

Reputation: 253

Resampling time-series data with pyspark

I have timeseries data that looks a bit like this (timestamp, value):

14 Dec 2020    1000
15 Jan 2021    1000
20 Jan 2021    1000
18 Feb 2021    1000
03 Mar 2021    1000

I'm essentially trying to get monthly values, smoothing out the value for every month. Each row represents the "value" between the two dates, so if we wanted to calculate the value for January, we'd need the value to represent:

15 days of January from the value in December + 5 days between Jan 15 - Jan 20 + 11 days between Jan 20 - Feb 18.

Value would be calculated as number of days relevant to the current month / length of whole interval * value:

Value for Jan: (15/32) * 1000 + (5/5) * 1000 + (11/28) * 1000

I've tried using resampling with the window function, but resampling on 1 month gives me an exception and also it simply returns the intervals instead of resampling everything.

Any advice is appreciated. Thanks.

Upvotes: 0

Views: 1144

Answers (1)

mck
mck

Reputation: 42422

You can interpolate the values between the dates using sequence, then group by the month and average over the values in each month.

EDIT: used an UDF from this answer because sequence is not supported for Spark 2.2

import pyspark.sql.functions as F
from pyspark.sql.types import *
import datetime

def generate_date_series(start, stop):
    return [start + datetime.timedelta(days=x) for x in range(0, (stop-start).days + 1)]    

spark.udf.register("generate_date_series", generate_date_series, ArrayType(DateType()))

result = df.withColumn(
    'timestamp',
    F.to_date(F.col('timestamp'), 'dd MMM yyyy')
).withColumn(
    'next_timestamp',
    F.expr("""
        generate_date_series(
            lag(timestamp, 1, timestamp + interval 1 day) -- need a default value for the last row
                over(order by timestamp) + interval 1 day,  -- don't want to include the previous date
            timestamp
        )
    """)
).select(
    F.explode('next_timestamp').alias('timestamp'), 
    (F.col('value') / F.size('next_timestamp')).alias('value')
).groupBy(
    F.year('timestamp').alias('year'),
    F.month('timestamp').alias('month')
).agg(
    F.sum('value').alias('value')
).orderBy('year', 'month')

result.show(truncate=False)
+----+-----+------------------+
|year|month|value             |
+----+-----+------------------+
|2020|12   |531.25            |
|2021|1    |1848.0603448275874|
|2021|2    |1389.920424403183 |
|2021|3    |230.76923076923077|
+----+-----+------------------+

Upvotes: 2

Related Questions