Welsige
Welsige

Reputation: 161

Scala modify value of nested column

I need to conditionally modify a value of a nested field in a Dataframe (or create a new field with the nested values). I would like to do it without having to use UDF, but I really would want to avoid RDD/map since the production tables can have many hundred millions of records and map in that condition dosen't ring as efficient/fast to me.

Bellow is the test case:


case class teste(var testID: Int  = 0, var testDesc: String = "", var testValue: String = "")


val DFMain = Seq( ("A",teste(1, "AAA", "10")),("B",teste(2, "BBB", "20")),("C",teste(3, "CCC", "30"))).toDF("F1","F2")
val DFNewData = Seq( ("A",teste(1, "AAA", "40")),("B",teste(2, "BBB", "50")),("C",teste(3, "CCC", "60"))).toDF("F1","F2")

val DFJoined = DFMain.join(DFNewData,DFMain("F2.testID")===DFNewData("F2.testID"),"left").
        select(DFMain("F1"), DFMain("F2"), DFNewData("F2.testValue").as("NewValue")).
        withColumn("F2.testValue",$"NewValue")

DFJoined.show()

This will add a new column, but I need that F2.testValue to be equal to the value of NewValue inside the Struct when its above 50.

Original Data:

+---+------------+
| F1|          F2|
+---+------------+
|  A|[1, AAA, 10]|
|  B|[2, BBB, 20]|
|  C|[3, CCC, 30]|
+---+------------+

Desired Result:

+---+------------+
| F1|          F2|
+---+------------+
|  A|[1, AAA, 10]|
|  B|[2, BBB, 50]|
|  C|[3, CCC, 60]|
+---+------------+

Upvotes: 2

Views: 214

Answers (2)

Welsige
Welsige

Reputation: 161

In addition to stack0114106 answer, I also found this solution for the problem, they are more or less alike:


val DFFinal = DFJoined.selectExpr("""
        named_struct(
          'F1', F1,
          'F2', named_struct(
            'testID', F2.testID,
            'testDesc', F2.testDesc,
            'testValue', case when NewValue>=50 then NewValue else F2.testValue end
            )
        ) as named_struct
      """).select($"named_struct.F1", $"named_struct.F2")

Upvotes: 1

stack0114106
stack0114106

Reputation: 8711

Could you please try this.

case class teste(var testID: Int  = 0, var testDesc: String = "", var testValue: String = "")
val DFMain = Seq( ("A",teste(1, "AAA", "10")),("B",teste(2, "BBB", "20")),("C",teste(3, "CCC", "30"))).toDF("F1","F2")
DFMain.show(false)

+---+------------+
|F1 |F2          |
+---+------------+
|A  |[1, AAA, 10]|
|B  |[2, BBB, 20]|
|C  |[3, CCC, 30]|
+---+------------+

val DFNewData = Seq( ("A",teste(1, "AAA", "40")),("B",teste(2, "BBB", "50")),("C",teste(3, "CCC", "60"))).toDF("F1","F2")

val DFJoined = DFMain.join(DFNewData,DFMain("F2.testID")===DFNewData("F2.testID"),"left").
        select(DFMain("F1"), DFMain("F2"), DFNewData("F2.testValue").as("NewValue"))
  .withColumn("F2_testValue",$"NewValue")

DFJoined.show

+---+------------+--------+------------+
| F1|          F2|NewValue|F2_testValue|
+---+------------+--------+------------+
|  A|[1, AAA, 10]|      40|          40|
|  B|[2, BBB, 20]|      50|          50|
|  C|[3, CCC, 30]|      60|          60|
+---+------------+--------+------------+

DFJoined.printSchema

root
 |-- F1: string (nullable = true)
 |-- F2: struct (nullable = true)
 |    |-- testID: integer (nullable = false)
 |    |-- testDesc: string (nullable = true)
 |    |-- testValue: string (nullable = true)
 |-- NewValue: string (nullable = true)
 |-- F2_testValue: string (nullable = true)

DFJoined.withColumn("f2_new", expr(" case when F2_testValue>=50 then concat_ws('|',F2.testID,F2.testDesc,F2_testValue) else concat_ws('|',F2.testID,F2.testDesc,F2.testValue) end "))
.withColumn("f2_new3",struct(split($"f2_new","[|]")(0),split($"f2_new","[|]")(1),split($"f2_new","[|]")(2) ) )
.show(false)

+---+------------+--------+------------+--------+------------+
|F1 |F2          |NewValue|F2_testValue|f2_new  |f2_new3     |
+---+------------+--------+------------+--------+------------+
|A  |[1, AAA, 10]|40      |40          |1|AAA|10|[1, AAA, 10]|
|B  |[2, BBB, 20]|50      |50          |2|BBB|50|[2, BBB, 50]|
|C  |[3, CCC, 30]|60      |60          |3|CCC|60|[3, CCC, 60]|
+---+------------+--------+------------+--------+------------+

f2_new3 is the desired output.

The reason for the workaround is the below one is not working.

DFJoined.withColumn("f2_new", expr(" case when F2_testValue>=50 then struct(F2.testID,F2.testDesc,F2_testValue) else struct(F2.testID,F2.testDesc,F2.testValue) end ")).show()

Upvotes: 3

Related Questions