elyptikus
elyptikus

Reputation: 1148

Evaluate cumulative sum by category and reset when new day starts

This is my PySpark Dataframe:

timestamp                   category   value
2000-10-11 11:00:00+00:00   A          1
2000-10-11 12:00:00+00:00   A          2
2000-10-12 13:00:00+00:00   A          1
2000-10-12 14:00:00+00:00   A          3
2000-10-11 14:00:00+00:00   B          1
2000-10-11 15:00:00+00:00   B          1

I want to get this result (differences between consecutive rows, grouped by feed):

timestamp                   category   value  cum_sum_by_date
2000-10-11 11:00:00+00:00   A          1      1
2000-10-11 12:00:00+00:00   A          2      3
2000-10-12 13:00:00+00:00   A          1      1
2000-10-12 14:00:00+00:00   A          3      4
2000-10-11 14:00:00+00:00   B          1      1
2000-10-11 15:00:00+00:00   B          1      2

I now how to get the cumulative sum just grouped by category, but I can not reset the counter on every new day:

from pyspark.sql import Window
from pyspark.sql import functions as f

w = (Window.partitionBy('category').orderBy('timestamp')
      .rangeBetween(Window.unboundedPreceding, 0))

df = df.withColumn('cum_sum_by_category', f.sum('value').over(w))
df.show()

Upvotes: 0

Views: 130

Answers (1)

AdibP
AdibP

Reputation: 2939

Your window w should be partitioned by the date of timestamp alongside the category column. Use to_date function to get the date of timestamp and partition window w by it.

from pyspark.sql import functions as f

w = (Window
     .partitionBy('category', f.to_date(f.col('timestamp')))
     .orderBy('timestamp')
     .rangeBetween(Window.unboundedPreceding, 0))

Upvotes: 1

Related Questions