Oscar Aguilar
Oscar Aguilar

Reputation: 35

How to sum every N rows over a Window in Pyspark?

I have tried different window functions to do this exercise, without success. Can anyone think of a different approach? Thought of adding and index column or an r_number.

year month week item department state sales sum(sales)_2wks sum(sales)_4wks sum(sales)_6wks
2020 1 1 1 1 TX $100 $250 $680 $1380
2020 1 2 1 1 TX $150 $250 $680 $1380
2020 1 3 1 1 TX $200 $430 $680 $1380
2020 1 4 1 1 TX $230 $430 $680 $1380
2020 1 5 1 1 TX $400 $700 $1050 $1380
2020 1 6 1 1 TX $300 $700 $1050 $1380
2020 1 7 1 1 TX $250 $350 $1050 $1200
2020 1 8 1 1 TX $100 $350 $1050 $1200
2020 1 9 1 1 TX $200 $400 $850 $1200
2020 1 10 1 1 TX $200 $400 $850 $1200
2020 1 11 1 1 TX $300 $450 $850 $1200
2020 1 11 1 1 TX $150 $450 $850 $1200

Upvotes: 2

Views: 2214

Answers (2)

Mohd Avais
Mohd Avais

Reputation: 247

The above solution is good, just that row_number will give a false impression if we have multiple rows for same week, because the modulus(row_number/2) should be same for the same week rows. Instead, prefer using dense_rank() over row_number() and rank() functions for obvious reasons.

val sales_data = Seq((2020,1,1,"1","1","TX",100),
                       (2020,1,1,"1","1","TX",150),
                       (2020,1,2,"1","1","TX",150),
                       (2020,1,3,"1","1","TX",200),
                       (2020,1,4,"1","1","TX",230),
                       (2020,1,5,"1","1","TX",400),
                       (2020,1,6,"1","1","TX",300),
                       (2020,1,7,"1","1","TX",250),
                       (2020,1,8,"1","1","TX",100),
                       (2020,1,9,"1","1","TX",200),
                       (2020,1,10,"1","1","TX",200),
                       (2020,1,11,"1","1","TX",300),
                       (2020,1,11,"1","1","TX",150))
                       
  //Calculate moving sales for 2 weeks, 4 weeks, 6 weeks
                               
  val sales_df = sales_data.toDF("year", "month", "week", "item", "dept", "state", "sale")
//  sales_df.show
  
  sales_df.withColumn("row_no", dense_rank().over(Window.partitionBy("item", "state","dept").orderBy("year", "month", "week"))-1)
          .withColumn("sum(sales)_2wks", sum($"sale").over(Window.partitionBy($"item", $"state",$"dept", ($"row_no"/2).cast("int"))))
          .withColumn("sum(sales)_3wks", sum($"sale").over(Window.partitionBy($"item", $"state",$"dept", ($"row_no"/3).cast("int"))))
          .withColumn("sum(sales)_4wks", sum($"sale").over(Window.partitionBy($"item", $"state",$"dept", ($"row_no"/4).cast("int"))))
          .withColumn("sum(sales)_6wks", sum($"sale").over(Window.partitionBy($"item", $"state",$"dept", ($"row_no"/6).cast("int"))))
          .show

Upvotes: 0

mck
mck

Reputation: 42352

You can assign row numbers, round them to the nearest 2/4/6 and use that as a partitioning column to sum over a window:

from pyspark.sql import functions as F, Window

result = df.withColumn(
    'rn', 
    F.row_number().over(Window.partitionBy('item', 'department', 'state').orderBy('year', 'month', 'week')) - 1
).withColumn(
    'sum_2wks', 
    F.sum('sales').over(Window.partitionBy('item', 'department', 'state', (F.col('rn') / 2).cast('int')))
).withColumn(
    'sum_4wks', 
    F.sum('sales').over(Window.partitionBy('item', 'department', 'state', (F.col('rn') / 4).cast('int')))
).withColumn(
    'sum_6wks', 
    F.sum('sales').over(Window.partitionBy('item', 'department', 'state', (F.col('rn') / 6).cast('int')))
)

result.show()
+----+-----+----+----+----------+-----+-----+---+--------+--------+--------+
|year|month|week|item|department|state|sales| rn|sum_2wks|sum_4wks|sum_6wks|
+----+-----+----+----+----------+-----+-----+---+--------+--------+--------+
|2020|    1|   1|   1|         1|   TX|  100|  0|     250|     680|    1380|
|2020|    1|   2|   1|         1|   TX|  150|  1|     250|     680|    1380|
|2020|    1|   3|   1|         1|   TX|  200|  2|     430|     680|    1380|
|2020|    1|   4|   1|         1|   TX|  230|  3|     430|     680|    1380|
|2020|    1|   5|   1|         1|   TX|  400|  4|     700|    1050|    1380|
|2020|    1|   6|   1|         1|   TX|  300|  5|     700|    1050|    1380|
|2020|    1|   7|   1|         1|   TX|  250|  6|     350|    1050|    1200|
|2020|    1|   8|   1|         1|   TX|  100|  7|     350|    1050|    1200|
|2020|    1|   9|   1|         1|   TX|  200|  8|     400|     850|    1200|
|2020|    1|  10|   1|         1|   TX|  200|  9|     400|     850|    1200|
|2020|    1|  11|   1|         1|   TX|  300| 10|     450|     850|    1200|
|2020|    1|  12|   1|         1|   TX|  150| 11|     450|     850|    1200|
+----+-----+----+----+----------+-----+-----+---+--------+--------+--------+

Upvotes: 1

Related Questions