taurz
taurz

Reputation: 194

create a column based on a condition and carrying over previous values

I have the below dataframe that is ordered by the "col1".

+----+----+
|col1|col2|
+----+----+
|   a|   x|
|   a|   x|
|   a|   y|
|   b|   x|
|   b|   z|
|   c|   x|
|   c|   y|
|   d|   z|
|   d|   x|
+----+----+

I want to add a new column say "col3" in a way that, for every row in unique group('a', 'b', 'c' 'd') in "col1" if "col2" value in ('x' or 'y') increment the value by 1 else if the value is 'z' or any other value carry over the value. for example, in the first row for "a" since col2 is x we increment 1 by adding 0 + 1 = 1, in the second row since col2 is again x we increment 1 + 1 = 2 and so on. For the second group where col1 values is b(4th row), we start new and since col2 value is x we increment 0 + 1 = 1. in the 5th row since the col2 value is z we dont increment and take previous value i.e 1. In the case of "d" (8th row). since the col2 value is not in x or y we dont increment and leave it as 0.

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   a|   x|   1|
|   a|   x|   2|
|   a|   y|   3|
|   b|   x|   1|
|   b|   z|   1|
|   c|   x|   1|
|   c|   y|   2|
|   d|   z|   0|
|   d|   x|   1|
+----+----+----+

Anyway I can achieve this without using UDF's in pyspark

Upvotes: 0

Views: 476

Answers (1)

Lamanus
Lamanus

Reputation: 13541

Use a window to partition the col1 and then make a new column with conditional expression.

from pyspark.sql.functions import *
from pyspark.sql import Window

w = Window.partitionBy("col1").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("col3", sum(when(col("col2").isin("x", "y"), 1).otherwise(0)).over(w)).orderBy("col1").show(10)

The result of the code is what you want exactly.

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   a|   x|   1|
|   a|   x|   2|
|   a|   y|   3|
|   b|   x|   1|
|   b|   z|   1|
|   c|   x|   1|
|   c|   y|   2|
|   d|   z|   0|
|   d|   x|   1|
+----+----+----+

Upvotes: 4

Related Questions