xmarston
xmarston

Reputation: 883

PySpark Cum Sum of two values

Given the following example dataframe:

advertiser_id| name | amount    | total             |max_total_advertiser|
4061         |source1|-434.955284|-354882.75336200005| -355938.53950700007
4061         |source2|-594.012216|-355476.76557800005| -355938.53950700007
4061         |source3|-461.773929|-355938.53950700007| -355938.53950700007

I need to sum the amount and the max_total_advertiser field in order to get the correct total value in each row. Taking into account that I need this total value for every group partitioned by advertiser_id. (The total column in the initial dataframe is incorrect, that's why I want to calculate correctly)

Something like that should be:

w = Window.partitionBy("advertiser_id").orderBy("advertiser_id")

df.withColumn("total_aux", when( lag("advertiser_id").over(w) == col("advertiser_id"), lag("total_aux").over(w) + col("amount") ).otherwise( col("max_total_advertiser") + col("amount") ))

This lag("total_aux") is not working because the column is not generated yet, that's what I want to achieve, if it is the first row in the group, sum the columns in the same row if not sum the previous obtained value with the current amount field. Example output:

advertiser_id| name | amount    | total_aux             |
4061         |source1|-434.955284|-356373.494791    |
4061         |source2|-594.012216|-356967.507007    | 
4061         |source3|-461.773929|-357429.280936    |

Thanks.

Upvotes: 1

Views: 547

Answers (2)

cronoik
cronoik

Reputation: 19320

I assume that name is a distinct value for each advertiser_id and your dataset is therefore sortable by name. I also assume that max_total_advertiser contains the same value for each advertiser_id. If one of those is not the case, please add a comment.

What you need is a rangeBetween window which gives you all preceding and following rows within the specified range. We will use Window.unboundedPreceding as we want to sum up all the previous values.

import pyspark.sql.functions as F
from pyspark.sql import Window

l = [
(4061, 'source1',-434.955284,-354882.75336200005, -355938.53950700007),
(4061, 'source2',-594.012216,-355476.76557800005, -345938.53950700007),
(4062, 'source1',-594.012216,-355476.76557800005, -5938.53950700007),
(4062, 'source2',-594.012216,-355476.76557800005, -5938.53950700007),
(4061, 'source3',-461.773929,-355938.53950700007, -355938.53950700007)
]

columns = ['advertiser_id','name' ,'amount', 'total', 'max_total_advertiser']

df=spark.createDataFrame(l, columns)

w = Window.partitionBy('advertiser_id').orderBy('name').rangeBetween(Window.unboundedPreceding, 0)

df = df.withColumn('total', F.sum('amount').over(w) + df.max_total_advertiser)
df.show()

Output:

+-------------+-------+-----------+-------------------+--------------------+ 
|advertiser_id|   name|     amount|              total|max_total_advertiser| 
+-------------+-------+-----------+-------------------+--------------------+ 
|         4062|source1|-594.012216|-6532.5517230000705|   -5938.53950700007| 
|         4062|source2|-594.012216| -7126.563939000071|   -5938.53950700007| 
|         4061|source1|-434.955284| -356373.4947910001| -355938.53950700007| 
|         4061|source2|-594.012216| -346967.5070070001| -345938.53950700007| 
|         4061|source3|-461.773929|-357429.28093600005| -355938.53950700007| 
+-------------+-------+-----------+-------------------+--------------------+

Upvotes: 1

leermeester
leermeester

Reputation: 365

You might be looking for the orderBy() function. Does this work?

from pyspark.sql.window import *

df.withColumn("cumulativeSum", sum(df("amount"))
             .over( Window.partitionBy("advertiser_id").orderBy("amount")))

Upvotes: 0

Related Questions