thalesthales
thalesthales

Reputation: 95

Pyspark: For each month, make a cumulative sum of the previous 3 months

I'm using PYSPARK and I'm trying to make a cumulative sum of the last 3 months from a specific month:

Example:

Month   Value
Jan/19    1
Feb/19    0
Mar/19    4
Apr/19    5
May/19    0
Jun/19   10

So the cumulative sum for each month on the previous months will be:

Month   Value
Jan/19    1
Feb/19  1 + 0 = 1
Mar/19  1+0+4 = 5
Apr/19  0+4+5 = 9
May/19  4+5+0 = 9
Jun/19  5+0+10 = 15 

I'm pretty sure that I need to use window and partition functions but I have no idea how to set up this.

Can anyone help me on this?

Thanks

Upvotes: 3

Views: 4141

Answers (1)

murtihash
murtihash

Reputation: 8410

Sample DataFrame:

df.show()
+------+-----+
| Month|Value|
+------+-----+
|Jan/19|    1|
|Feb/19|    0|
|Mar/19|    4|
|Apr/19|    5|
|May/19|    0|
|Jun/19|   10|
+------+-----+

You can use window function, but you need to convert your month column to a proper timestamp format, and then cast that to long to compute range(3months) based on unix time or timestamp in seconds. You can partitionBy your grouping columns in your real data. (86400 is 1 day in seconds).

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().orderBy(F.col("Month").cast("long")).rangeBetween(-(86400*89), 0)
df\
.withColumn("Month", F.to_timestamp("Month","MMM/yy"))\
.withColumn("Sum", F.sum("Value").over(w)).show()

+-------------------+-----+---+
|              Month|Value|Sum|
+-------------------+-----+---+
|2019-01-01 00:00:00|    1|  1|
|2019-02-01 00:00:00|    0|  1|
|2019-03-01 00:00:00|    4|  5|
|2019-04-01 00:00:00|    5|  9|
|2019-05-01 00:00:00|    0|  9|
|2019-06-01 00:00:00|   10| 15|
+-------------------+-----+---+

If you would like to go back 3 months only in each year. Meaning Jan/19 will only have Jan/19 value. For this case, you should use a partitionBy of Year, and orderBy month number, and rangeBetween -2 and 0.

w=Window().partitionBy(F.year("Month")).orderBy(F.month("Month")).rangeBetween(-2, 0)
df\
.withColumn("Month", F.to_timestamp("Month","MMM/yy"))\
.withColumn("Sum", F.sum("Value").over(w)).show()

+-------------------+-----+---+
|              Month|Value|Sum|
+-------------------+-----+---+
|2019-01-01 00:00:00|    1|  1|
|2019-02-01 00:00:00|    0|  1|
|2019-03-01 00:00:00|    4|  5|
|2019-04-01 00:00:00|    5|  9|
|2019-05-01 00:00:00|    0|  9|
|2019-06-01 00:00:00|   10| 15|
+-------------------+-----+---+

Upvotes: 5

Related Questions