Aussie_Stats
Aussie_Stats

Reputation: 29

Pyspark; iterate over dataframe to sum group based on date filters

I have a data frame that looks like the below. Not all customers have purchase recorded in each year and month combination. I want to iterate through and sum purchases made in Last 3 months, 6 months and 12 months.

I cannot insert new rows for missing months as my dataset is very large.

Things I have tried a) Convert year and month to date b) Use sum and case when, which didn't work. c) Used iterate over rows to sum but the date I created was timestamp and subtracting to day kept giving errors.

Input

Customer_ID, Purchase_Year, Purchase_Month, Purchases
1 2019 1 4
1 2019 2 6
1 2019 3 4
1 2019 4 2
2 2019 1 2
2 2019 5 3
3 2019 1 9

Expected Output

Customer_ID, Purchase_Year, Purchase_Month, Purchases, L3M
1 2019 1 4 4
1 2019 2 6 10
1 2019 3 4 14
1 2019 4 2 12
2 2019 1 2 2
2 2019 5 3 3
3 2019 1 9 9

My initial attempt which works (but not for missing rows)

sqlContext.sql("""select *, sum(Purchases) over (partition by customer_id
                          order by Purchase_Year, Purchase_Month
                           rows between 3 preceding and current row) as total_s
 from customer""").show()

Upvotes: 1

Views: 605

Answers (1)

murtihash
murtihash

Reputation: 8410

I think the approach you had earlier using timestamps was correct, because if your last6 months goes to 2018, then how would you go from 2019 to 2018 using purchase year and purchase month.

You could convert the timestamp to long and then use rangebetween in a window function to traverse as many days you want, for your 3months, you could use 89 from current day(90 total).

from pyspark.sql import functions as F
from pyspark.sql.window import Window
days= lambda i: i * 86400
w=Window().partitionBy("Customer_ID").orderBy("sec").rangeBetween(-days(89),0)
df.withColumn("sec", F.to_timestamp(F.concat("Purchase_Year","Purchase_Month"),"yyyyM").cast("long"))\
  .withColumn("L3", F.sum("Purchases").over(w)).orderBy("Customer_ID","Purchase_Month").drop("sec").show()

+-----------+-------------+--------------+---------+---+
|Customer_ID|Purchase_Year|Purchase_Month|Purchases| L3|
+-----------+-------------+--------------+---------+---+
|          1|         2019|             1|        4|  4|
|          1|         2019|             2|        6| 10|
|          1|         2019|             3|        4| 14|
|          1|         2019|             4|        2| 12|
|          2|         2019|             1|        2|  2|
|          2|         2019|             5|        3|  3|
|          3|         2019|             1|        9|  9|
+-----------+-------------+--------------+---------+---+

Upvotes: 1

Related Questions