Reputation: 1148
This is my PySpark Dataframe:
timestamp category value
2000-10-11 11:00:00+00:00 A 1
2000-10-11 12:00:00+00:00 A 2
2000-10-12 13:00:00+00:00 A 1
2000-10-12 14:00:00+00:00 A 3
2000-10-11 14:00:00+00:00 B 1
2000-10-11 15:00:00+00:00 B 1
I want to get this result (differences between consecutive rows, grouped by feed
):
timestamp category value cum_sum_by_date
2000-10-11 11:00:00+00:00 A 1 1
2000-10-11 12:00:00+00:00 A 2 3
2000-10-12 13:00:00+00:00 A 1 1
2000-10-12 14:00:00+00:00 A 3 4
2000-10-11 14:00:00+00:00 B 1 1
2000-10-11 15:00:00+00:00 B 1 2
I now how to get the cumulative sum just grouped by category, but I can not reset the counter on every new day:
from pyspark.sql import Window
from pyspark.sql import functions as f
w = (Window.partitionBy('category').orderBy('timestamp')
.rangeBetween(Window.unboundedPreceding, 0))
df = df.withColumn('cum_sum_by_category', f.sum('value').over(w))
df.show()
Upvotes: 0
Views: 130
Reputation: 2939
Your window w
should be partitioned by the date of timestamp alongside the category column. Use to_date
function to get the date of timestamp and partition window w
by it.
from pyspark.sql import functions as f
w = (Window
.partitionBy('category', f.to_date(f.col('timestamp')))
.orderBy('timestamp')
.rangeBetween(Window.unboundedPreceding, 0))
Upvotes: 1