Keerikkattu Chellappan
Keerikkattu Chellappan

Reputation: 515

PySpark Lag function

The set up is as below.

from pyspark.sql import Row, functions as F
from pyspark.sql.window import Window
import pandas as pd

data = {'A': [2,2,2,2], 'B': [0.5, 0.5,1,1.5]}
df = pd.DataFrame(data)
ddf = spark.createDataFrame(df)

I need a column C. And the logic for calculating column C is below.

I finding it hard to fulfill the second case. Because lag(C) do not exist in the context.

(
    ddf  
    .withColumn
    (
        'C',
         F.when(F.lag(F.col('B')).over(Window.partitionBy(F.col('A')).orderBy(F.col('A'))).isNull(), (F.col('A') - F.col('B')))
        .otherwise
        (               
           F.lag(F.col('C')).over(Window.partitionBy(F.col('A')).orderBy(F.col('A'))) - F.col('B') 
            # cannot resolve '`C`' given input columns: [A, B]
        )                
    )  
   .show()
)

The correct result should be as below.

+---+---+----+
|  A|  B| C  |
+---+---+----+
|  2|0.5|1.5 |
|  2|0.5|1.0 |
|  2|1.0|0.0 |
|  2|1.5|-1.5|
+---+---+----+

Hope the problem is clear. How to solve this scenario.

Upvotes: 2

Views: 1978

Answers (1)

anky
anky

Reputation: 75150

You are looking to subtract the cumulative sum of column B from A. Try something like below.

Note that I have taken the order as monotonically increasing ID, you can replace it with the ordering column which you want to keep and partition if you have any partition column.

w = Window.orderBy(F.monotonically_increasing_id())\
          .rangeBetween(Window.unboundedPreceding,0)

ddf.withColumn("C",F.col("A")-F.sum("B").over(w)).show()

+---+---+----+
|  A|  B|   C|
+---+---+----+
|  2|0.5| 1.5|
|  2|0.5| 1.0|
|  2|1.0| 0.0|
|  2|1.5|-1.5|
+---+---+----+

Upvotes: 1

Related Questions