Reputation: 57
I have a data set as below and I want to calculate Rolling Sum Expense each 6 months for each customer using Pyspark. I have using window in pyspark.sql.function but did not success. I just wonder if anyone can help me with this.
Thank you.
In summary, the process is as follows:
Here is the sample data
d = [
{'id': 1, 'Month': 1, 'expense': 11.0,},
{'id': 1, 'Month': 1, 'expense': 15.0},
{'id': 1, 'Month': 1, 'expense': 16.0},
{'id': 1, 'Month': 2, 'expense': 12.0},
{'id': 1, 'Month': 2, 'expense': 14.0},
{'id': 1, 'Month': 3, 'expense': 6.0},
{'id': 1, 'Month': 3, 'expense': 7.0},
{'id': 1, 'Month': 3, 'expense': 4.0},
{'id': 1, 'Month': 4, 'expense': 4.0},
{'id': 1, 'Month': 5, 'expense': 6.0},
{'id': 1, 'Month': 6, 'expense': 7.0},
{'id': 1, 'Month': 7, 'expense': 8.0},
{'id': 1, 'Month': 8, 'expense': 9.0},
{'id': 2, 'Month': 1, 'expense': 1.0},
{'id': 2, 'Month': 1, 'expense': 5.0},
{'id': 2, 'Month': 1, 'expense': 6.0},
{'id': 2, 'Month': 2, 'expense': 2.0},
{'id': 2, 'Month': 2, 'expense': 4.0},
{'id': 2, 'Month': 3, 'expense': 14.0},
{'id': 2, 'Month': 3, 'expense': 17.0},
{'id': 2, 'Month': 3, 'expense': 16.0},
{'id': 1, 'Month': 4, 'expense': 4.0},
{'id': 1, 'Month': 5, 'expense': 6.0},
{'id': 1, 'Month': 6, 'expense': 7.0},
{'id': 1, 'Month': 7, 'expense': 8.0},
{'id': 1, 'Month': 8, 'expense': 9.0}]
Upvotes: 0
Views: 692
Reputation: 3419
You can use the rangeBetween
function to calculate the rolling sum.
I'm defining my frame boundaries as rangeBetween(-1, 0)
. -1
denotes one-off before the current row, and 0
represents the current row. Aggregating over this frame gives you a two-month rolling average:
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.window import Window
w = Window().partitionBy("id").orderBy('Month')
df.groupBy("id", "Month").agg(F.sum(col("expense")).alias("total_expense")) \
.withColumn("rank", F.row_number().over(w)) \
.withColumn("two_month_rolling_sum", F.sum(col("total_expense")).over(w.rangeBetween(-1, 0))) \
.filter(col("rank")!=1)\
.drop("total_expense", "rank")\
.orderBy("id", "Month").show()
Output:
+---+-----+---------------------+
| id|Month|two_month_rolling_sum|
+---+-----+---------------------+
| 1| 2| 68.0|
| 1| 3| 43.0|
| 2| 2| 18.0|
| 2| 3| 53.0|
+---+-----+---------------------+
Upvotes: 0