Reputation: 515
The set up is as below.
from pyspark.sql import Row, functions as F
from pyspark.sql.window import Window
import pandas as pd
data = {'A': [2,2,2,2], 'B': [0.5, 0.5,1,1.5]}
df = pd.DataFrame(data)
ddf = spark.createDataFrame(df)
I need a column C. And the logic for calculating column C is below.
I finding it hard to fulfill the second case. Because lag(C) do not exist in the context.
(
ddf
.withColumn
(
'C',
F.when(F.lag(F.col('B')).over(Window.partitionBy(F.col('A')).orderBy(F.col('A'))).isNull(), (F.col('A') - F.col('B')))
.otherwise
(
F.lag(F.col('C')).over(Window.partitionBy(F.col('A')).orderBy(F.col('A'))) - F.col('B')
# cannot resolve '`C`' given input columns: [A, B]
)
)
.show()
)
The correct result should be as below.
+---+---+----+
| A| B| C |
+---+---+----+
| 2|0.5|1.5 |
| 2|0.5|1.0 |
| 2|1.0|0.0 |
| 2|1.5|-1.5|
+---+---+----+
Hope the problem is clear. How to solve this scenario.
Upvotes: 2
Views: 1978
Reputation: 75150
You are looking to subtract the cumulative sum of column B from A. Try something like below.
Note that I have taken the order as monotonically increasing ID, you can replace it with the ordering column which you want to keep and partition if you have any partition column.
w = Window.orderBy(F.monotonically_increasing_id())\
.rangeBetween(Window.unboundedPreceding,0)
ddf.withColumn("C",F.col("A")-F.sum("B").over(w)).show()
+---+---+----+
| A| B| C|
+---+---+----+
| 2|0.5| 1.5|
| 2|0.5| 1.0|
| 2|1.0| 0.0|
| 2|1.5|-1.5|
+---+---+----+
Upvotes: 1