Reputation: 253
I have timeseries data that looks a bit like this (timestamp, value):
14 Dec 2020 1000
15 Jan 2021 1000
20 Jan 2021 1000
18 Feb 2021 1000
03 Mar 2021 1000
I'm essentially trying to get monthly values, smoothing out the value for every month. Each row represents the "value" between the two dates, so if we wanted to calculate the value for January, we'd need the value to represent:
15 days of January from the value in December + 5 days between Jan 15 - Jan 20 + 11 days between Jan 20 - Feb 18.
Value would be calculated as number of days relevant to the current month / length of whole interval * value:
Value for Jan: (15/32) * 1000 + (5/5) * 1000 + (11/28) * 1000
I've tried using resampling with the window function, but resampling on 1 month gives me an exception and also it simply returns the intervals instead of resampling everything.
Any advice is appreciated. Thanks.
Upvotes: 0
Views: 1144
Reputation: 42422
You can interpolate the values between the dates using sequence
, then group by the month and average over the values in each month.
EDIT: used an UDF from this answer because sequence
is not supported for Spark 2.2
import pyspark.sql.functions as F
from pyspark.sql.types import *
import datetime
def generate_date_series(start, stop):
return [start + datetime.timedelta(days=x) for x in range(0, (stop-start).days + 1)]
spark.udf.register("generate_date_series", generate_date_series, ArrayType(DateType()))
result = df.withColumn(
'timestamp',
F.to_date(F.col('timestamp'), 'dd MMM yyyy')
).withColumn(
'next_timestamp',
F.expr("""
generate_date_series(
lag(timestamp, 1, timestamp + interval 1 day) -- need a default value for the last row
over(order by timestamp) + interval 1 day, -- don't want to include the previous date
timestamp
)
""")
).select(
F.explode('next_timestamp').alias('timestamp'),
(F.col('value') / F.size('next_timestamp')).alias('value')
).groupBy(
F.year('timestamp').alias('year'),
F.month('timestamp').alias('month')
).agg(
F.sum('value').alias('value')
).orderBy('year', 'month')
result.show(truncate=False)
+----+-----+------------------+
|year|month|value |
+----+-----+------------------+
|2020|12 |531.25 |
|2021|1 |1848.0603448275874|
|2021|2 |1389.920424403183 |
|2021|3 |230.76923076923077|
+----+-----+------------------+
Upvotes: 2