pphong
pphong

Reputation: 57

Calculate Rolling Sum Expense each 6 months for each customer using Pyspark

I have a data set as below and I want to calculate Rolling Sum Expense each 6 months for each customer using Pyspark. I have using window in pyspark.sql.function but did not success. I just wonder if anyone can help me with this.

Thank you.

In summary, the process is as follows: ![enter image description here Here is the sample data

d = [
    {'id': 1, 'Month': 1, 'expense': 11.0,}, 
    {'id': 1, 'Month': 1, 'expense': 15.0},
    {'id': 1, 'Month': 1, 'expense': 16.0}, 
    {'id': 1, 'Month': 2, 'expense': 12.0}, 
    {'id': 1, 'Month': 2, 'expense': 14.0}, 
    {'id': 1, 'Month': 3, 'expense': 6.0}, 
    {'id': 1, 'Month': 3, 'expense': 7.0}, 
    {'id': 1, 'Month': 3, 'expense': 4.0}, 
    {'id': 1, 'Month': 4, 'expense': 4.0}, 
    {'id': 1, 'Month': 5, 'expense': 6.0}, 
    {'id': 1, 'Month': 6, 'expense': 7.0}, 
    {'id': 1, 'Month': 7, 'expense': 8.0}, 
    {'id': 1, 'Month': 8, 'expense': 9.0},
    {'id': 2, 'Month': 1, 'expense': 1.0}, 
    {'id': 2, 'Month': 1, 'expense': 5.0},
    {'id': 2, 'Month': 1, 'expense': 6.0}, 
    {'id': 2, 'Month': 2, 'expense': 2.0}, 
    {'id': 2, 'Month': 2, 'expense': 4.0}, 
    {'id': 2, 'Month': 3, 'expense': 14.0}, 
    {'id': 2, 'Month': 3, 'expense': 17.0}, 
    {'id': 2, 'Month': 3, 'expense': 16.0},
    {'id': 1, 'Month': 4, 'expense': 4.0}, 
    {'id': 1, 'Month': 5, 'expense': 6.0}, 
    {'id': 1, 'Month': 6, 'expense': 7.0}, 
    {'id': 1, 'Month': 7, 'expense': 8.0}, 
    {'id': 1, 'Month': 8, 'expense': 9.0}]

Upvotes: 0

Views: 692

Answers (1)

Cena
Cena

Reputation: 3419

You can use the rangeBetween function to calculate the rolling sum.

I'm defining my frame boundaries as rangeBetween(-1, 0). -1 denotes one-off before the current row, and 0 represents the current row. Aggregating over this frame gives you a two-month rolling average:

from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.window import Window

w = Window().partitionBy("id").orderBy('Month')

df.groupBy("id", "Month").agg(F.sum(col("expense")).alias("total_expense")) \
  .withColumn("rank", F.row_number().over(w)) \
  .withColumn("two_month_rolling_sum", F.sum(col("total_expense")).over(w.rangeBetween(-1, 0))) \
  .filter(col("rank")!=1)\
  .drop("total_expense", "rank")\
  .orderBy("id", "Month").show()

Output:

+---+-----+---------------------+
| id|Month|two_month_rolling_sum|
+---+-----+---------------------+
|  1|    2|                 68.0|
|  1|    3|                 43.0|
|  2|    2|                 18.0|
|  2|    3|                 53.0|
+---+-----+---------------------+

Upvotes: 0

Related Questions