mr kw
mr kw

Reputation: 2113

Python Spark Cumulative Sum by Group Using DataFrame

How do I compute the cumulative sum per group specifically using the DataFrame abstraction; and in PySpark?

With an example dataset as follows:

df = sqlContext.createDataFrame( [(1,2,"a"),(3,2,"a"),(1,3,"b"),(2,2,"a"),(2,3,"b")], 
                                 ["time", "value", "class"] )

+----+-----+-----+
|time|value|class|
+----+-----+-----+
|   1|    2|    a|
|   3|    2|    a|
|   1|    3|    b|
|   2|    2|    a|
|   2|    3|    b|
+----+-----+-----+

I would like to add a cumulative sum column of value for each class grouping over the (ordered) time variable.

Upvotes: 61

Views: 94744

Answers (4)

I create this function in this link for my use: kolang/column_functions/cumulative_sum

def cumulative_sum(col: Union[Column, str],
                   on_col: Union[Column, str],
                   ascending: bool = True,
                   partition_by: Union[Column, str, List[Union[Column, str]]] = None) -> Column:
    on_col = on_col if ascending else F.desc(on_col)
    if partition_by is None:
        w = Window.orderBy(on_col).rangeBetween(Window.unboundedPreceding, 0)
    else:
        w = Window.partitionBy(partition_by).orderBy(on_col).rangeBetween(Window.unboundedPreceding, 0)
    return F.sum(col).over(w)

Upvotes: 0

vegetarianCoder
vegetarianCoder

Reputation: 2958

To make an update from previous answers. The correct and precise way to do is :

from pyspark.sql import Window
from pyspark.sql import functions as F

windowval = (Window.partitionBy('class').orderBy('time')
             .rowsBetween(Window.unboundedPreceding, 0))
df_w_cumsum = df.withColumn('cum_sum', F.sum('value').over(windowval))
df_w_cumsum.show()

Upvotes: 24

Anubhav Raj
Anubhav Raj

Reputation: 51

I have tried this way and it worked for me.

from pyspark.sql import Window

from pyspark.sql import functions as f

import sys

cum_sum = DF.withColumn('cumsum', f.sum('value').over(Window.partitionBy('class').orderBy('time').rowsBetween(-sys.maxsize, 0)))
cum_sum.show()

Upvotes: 5

mr kw
mr kw

Reputation: 2113

This can be done using a combination of a window function and the Window.unboundedPreceding value in the window's range as follows:

from pyspark.sql import Window
from pyspark.sql import functions as F

windowval = (Window.partitionBy('class').orderBy('time')
             .rangeBetween(Window.unboundedPreceding, 0))
df_w_cumsum = df.withColumn('cum_sum', F.sum('value').over(windowval))
df_w_cumsum.show()
+----+-----+-----+-------+
|time|value|class|cum_sum|
+----+-----+-----+-------+
|   1|    3|    b|      3|
|   2|    3|    b|      6|
|   1|    2|    a|      2|
|   2|    2|    a|      4|
|   3|    2|    a|      6|
+----+-----+-----+-------+

Upvotes: 116

Related Questions