Reputation: 194
I have the below dataframe that is ordered by the "col1".
+----+----+
|col1|col2|
+----+----+
| a| x|
| a| x|
| a| y|
| b| x|
| b| z|
| c| x|
| c| y|
| d| z|
| d| x|
+----+----+
I want to add a new column say "col3" in a way that, for every row in unique group('a', 'b', 'c' 'd') in "col1" if "col2" value in ('x' or 'y') increment the value by 1 else if the value is 'z' or any other value carry over the value. for example, in the first row for "a" since col2 is x we increment 1 by adding 0 + 1 = 1, in the second row since col2 is again x we increment 1 + 1 = 2 and so on. For the second group where col1 values is b(4th row), we start new and since col2 value is x we increment 0 + 1 = 1. in the 5th row since the col2 value is z we dont increment and take previous value i.e 1. In the case of "d" (8th row). since the col2 value is not in x or y we dont increment and leave it as 0.
+----+----+----+
|col1|col2|col3|
+----+----+----+
| a| x| 1|
| a| x| 2|
| a| y| 3|
| b| x| 1|
| b| z| 1|
| c| x| 1|
| c| y| 2|
| d| z| 0|
| d| x| 1|
+----+----+----+
Anyway I can achieve this without using UDF's in pyspark
Upvotes: 0
Views: 476
Reputation: 13541
Use a window to partition the col1
and then make a new column with conditional expression.
from pyspark.sql.functions import *
from pyspark.sql import Window
w = Window.partitionBy("col1").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("col3", sum(when(col("col2").isin("x", "y"), 1).otherwise(0)).over(w)).orderBy("col1").show(10)
The result of the code is what you want exactly.
+----+----+----+
|col1|col2|col3|
+----+----+----+
| a| x| 1|
| a| x| 2|
| a| y| 3|
| b| x| 1|
| b| z| 1|
| c| x| 1|
| c| y| 2|
| d| z| 0|
| d| x| 1|
+----+----+----+
Upvotes: 4