Anand K
Anand K

Reputation: 283

spark scala transform a dataframe/rdd

I have a CSV file like below.

PK,key,Value
100,col1,val11
100,col2,val12
100,idx,1
100,icol1,ival11
100,icol3,ival13
100,idx,2
100,icol1,ival21
100,icol2,ival22
101,col1,val21
101,col2,val22
101,idx,1
101,icol1,ival11
101,icol3,ival13
101,idx,3
101,icol1,ival31
101,icol2,ival32

I want to transform this into the following.

PK,idx,key,Value
100,,col1,val11
100,,col2,val12
100,1,idx,1
100,1,icol1,ival11
100,1,icol3,ival13
100,2,idx,2
100,2,icol1,ival21
100,2,icol2,ival22
101,,col1,val21
101,,col2,val22
101,1,idx,1
101,1,icol1,ival11
101,1,icol3,ival13
101,3,idx,3
101,3,icol1,ival31
101,3,icol2,ival32

Basically I want to create the an new column called idx in the output dataframe which will be populated with the same value "n" as that of the row following the key=idx, value="n".

Upvotes: 1

Views: 86

Answers (1)

abiratsis
abiratsis

Reputation: 7336

Here is one way using last window function with Spark >= 2.0.0:

import org.apache.spark.sql.functions.{last, when, lit}
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy("PK").rowsBetween(Window.unboundedPreceding, 0)

df.withColumn("idx", when($"key" === lit("idx"), $"Value"))
  .withColumn("idx", last($"idx", true).over(w))
  .orderBy($"PK")
  .show

Output:

+---+-----+------+----+
| PK|  key| Value| idx|
+---+-----+------+----+
|100| col1| val11|null|
|100| col2| val12|null|
|100|  idx|     1|   1|
|100|icol1|ival11|   1|
|100|icol3|ival13|   1|
|100|  idx|     2|   2|
|100|icol1|ival21|   2|
|100|icol2|ival22|   2|
|101| col1| val21|null|
|101| col2| val22|null|
|101|  idx|     1|   1|
|101|icol1|ival11|   1|
|101|icol3|ival13|   1|
|101|  idx|     3|   3|
|101|icol1|ival31|   3|
|101|icol2|ival32|   3|
+---+-----+------+----+

The code first creates a new column called idx which contains the value of Value when key == idx, or null otherwise. Then it retrieves the last observed idx over the defined window.

Upvotes: 1

Related Questions