karoma
karoma

Reputation: 1558

Updating some row values in a Spark DataFrame

I have a dataframe which I want to merge into another dataframe, but only to affect specific cells rather than a whole row.

Old dataframe:

## +---+----+----+
## |key|val1|val2|
## +---+----+----+
## |  1|  aa|  ab|
## |  2|  bb|  bc|
## +---+----+----+

New dataframe:

## +---+----+
## |key|val1|
## +---+----+
## |  2| bbb|
## +---+----+

Result:

## +---+----+----+
## |key|val1|val2|
## +---+----+----+
## |  1|  aa|  ab|
## |  2| bbb|  bc|
## +---+----+----+

The key is unique in this case so the row to be affected will always be identifiable. The old dataframe will also always contain the keys from the new dataframe.

As dataframes are immutable I'll have to call withColumn to create a new one, presumably by passing in some sort of UDF, but I'm a bit lost when it comes to what that UDF should contain.

Upvotes: 3

Views: 5046

Answers (1)

eliasah
eliasah

Reputation: 40380

You need to use an outer join to get the expected output :

scala> val oldDf = Seq((1, "aa", "ab"), (2, "bb", "bc")).toDF("key", "val1", "val2").as("old")
// oldDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [key: int, val1: string ... 1 more field]
scala> val newDf = Seq((2, "bbb")).toDF("key", "val1").as("new")
// newDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [key: int, val1: string]

scala> oldDf.join(newDf, Seq("key"), "outer").select($"key", coalesce($"new.val1", $"old.val1").alias("val1"), $"val2").show
// +---+----+----+
// |key|val1|val2|
// +---+----+----+
// |  1|  aa|  ab| 
// |  2| bbb|  bc|
// +---+----+----+

Note: coalesce will select the first not null value between new.val1 and old.val1.

Upvotes: 4

Related Questions