Reputation: 35
I have tried different window functions to do this exercise, without success. Can anyone think of a different approach? Thought of adding and index column or an r_number.
year | month | week | item | department | state | sales | sum(sales)_2wks | sum(sales)_4wks | sum(sales)_6wks |
---|---|---|---|---|---|---|---|---|---|
2020 | 1 | 1 | 1 | 1 | TX | $100 | $250 | $680 | $1380 |
2020 | 1 | 2 | 1 | 1 | TX | $150 | $250 | $680 | $1380 |
2020 | 1 | 3 | 1 | 1 | TX | $200 | $430 | $680 | $1380 |
2020 | 1 | 4 | 1 | 1 | TX | $230 | $430 | $680 | $1380 |
2020 | 1 | 5 | 1 | 1 | TX | $400 | $700 | $1050 | $1380 |
2020 | 1 | 6 | 1 | 1 | TX | $300 | $700 | $1050 | $1380 |
2020 | 1 | 7 | 1 | 1 | TX | $250 | $350 | $1050 | $1200 |
2020 | 1 | 8 | 1 | 1 | TX | $100 | $350 | $1050 | $1200 |
2020 | 1 | 9 | 1 | 1 | TX | $200 | $400 | $850 | $1200 |
2020 | 1 | 10 | 1 | 1 | TX | $200 | $400 | $850 | $1200 |
2020 | 1 | 11 | 1 | 1 | TX | $300 | $450 | $850 | $1200 |
2020 | 1 | 11 | 1 | 1 | TX | $150 | $450 | $850 | $1200 |
Upvotes: 2
Views: 2214
Reputation: 247
The above solution is good, just that row_number will give a false impression if we have multiple rows for same week, because the modulus(row_number/2) should be same for the same week rows. Instead, prefer using dense_rank() over row_number() and rank() functions for obvious reasons.
val sales_data = Seq((2020,1,1,"1","1","TX",100),
(2020,1,1,"1","1","TX",150),
(2020,1,2,"1","1","TX",150),
(2020,1,3,"1","1","TX",200),
(2020,1,4,"1","1","TX",230),
(2020,1,5,"1","1","TX",400),
(2020,1,6,"1","1","TX",300),
(2020,1,7,"1","1","TX",250),
(2020,1,8,"1","1","TX",100),
(2020,1,9,"1","1","TX",200),
(2020,1,10,"1","1","TX",200),
(2020,1,11,"1","1","TX",300),
(2020,1,11,"1","1","TX",150))
//Calculate moving sales for 2 weeks, 4 weeks, 6 weeks
val sales_df = sales_data.toDF("year", "month", "week", "item", "dept", "state", "sale")
// sales_df.show
sales_df.withColumn("row_no", dense_rank().over(Window.partitionBy("item", "state","dept").orderBy("year", "month", "week"))-1)
.withColumn("sum(sales)_2wks", sum($"sale").over(Window.partitionBy($"item", $"state",$"dept", ($"row_no"/2).cast("int"))))
.withColumn("sum(sales)_3wks", sum($"sale").over(Window.partitionBy($"item", $"state",$"dept", ($"row_no"/3).cast("int"))))
.withColumn("sum(sales)_4wks", sum($"sale").over(Window.partitionBy($"item", $"state",$"dept", ($"row_no"/4).cast("int"))))
.withColumn("sum(sales)_6wks", sum($"sale").over(Window.partitionBy($"item", $"state",$"dept", ($"row_no"/6).cast("int"))))
.show
Upvotes: 0
Reputation: 42352
You can assign row numbers, round them to the nearest 2/4/6 and use that as a partitioning column to sum over a window:
from pyspark.sql import functions as F, Window
result = df.withColumn(
'rn',
F.row_number().over(Window.partitionBy('item', 'department', 'state').orderBy('year', 'month', 'week')) - 1
).withColumn(
'sum_2wks',
F.sum('sales').over(Window.partitionBy('item', 'department', 'state', (F.col('rn') / 2).cast('int')))
).withColumn(
'sum_4wks',
F.sum('sales').over(Window.partitionBy('item', 'department', 'state', (F.col('rn') / 4).cast('int')))
).withColumn(
'sum_6wks',
F.sum('sales').over(Window.partitionBy('item', 'department', 'state', (F.col('rn') / 6).cast('int')))
)
result.show()
+----+-----+----+----+----------+-----+-----+---+--------+--------+--------+
|year|month|week|item|department|state|sales| rn|sum_2wks|sum_4wks|sum_6wks|
+----+-----+----+----+----------+-----+-----+---+--------+--------+--------+
|2020| 1| 1| 1| 1| TX| 100| 0| 250| 680| 1380|
|2020| 1| 2| 1| 1| TX| 150| 1| 250| 680| 1380|
|2020| 1| 3| 1| 1| TX| 200| 2| 430| 680| 1380|
|2020| 1| 4| 1| 1| TX| 230| 3| 430| 680| 1380|
|2020| 1| 5| 1| 1| TX| 400| 4| 700| 1050| 1380|
|2020| 1| 6| 1| 1| TX| 300| 5| 700| 1050| 1380|
|2020| 1| 7| 1| 1| TX| 250| 6| 350| 1050| 1200|
|2020| 1| 8| 1| 1| TX| 100| 7| 350| 1050| 1200|
|2020| 1| 9| 1| 1| TX| 200| 8| 400| 850| 1200|
|2020| 1| 10| 1| 1| TX| 200| 9| 400| 850| 1200|
|2020| 1| 11| 1| 1| TX| 300| 10| 450| 850| 1200|
|2020| 1| 12| 1| 1| TX| 150| 11| 450| 850| 1200|
+----+-----+----+----+----------+-----+-----+---+--------+--------+--------+
Upvotes: 1