blue20090
blue20090

Reputation: 63

How to join two DataFrames and update missing values?

I use Spark 2.0 and want to update/merge row values in a DataFrame.

I have two DataFrames (old and new ones) which I want to merge in a way that when the old DataFrame has more rows than the new DataFrame, set the old data value 0.

Case 1 Merge

Old dataframe:

## +---+----+----+
## |key|val1|val2|
## +---+----+----+
## |  1|  aa|  ab|
## |  2|  bb|  bc|
## +---+----+----+

New dataframe:

## +---+----+----+
## |key|val1|val2|
## +---+----+----+
## |  1|  aa|  ab|
## |  2|  bb|  bb|
## |  3|  cc|  cc|
## +---+----+----+

Result:

## +---+----+----+
## |key|val1|val2|
## +---+----+----+
## |  1|  aa|  ab|
## |  2|  bb|  bb|
## |  3|  cc|  cc|
## +---+----+----+

Case 2 Update

Old dataframe:

## +---+----+----+
## |key|val1|val2|
## +---+----+----+
## |  1|  aa|  ab|
## |  2|  bb|  bb|
## |  3|  cc|  cc|
## +---+----+----+

New dataframe:

## +---+----+----+
## |key|val1|val2|
## +---+----+----+
## |  1|  aa|  ab|
## |  2|  bb|  bc|
## +---+----+----+

Result:

## +---+----+----+
## |key|val1|val2|
## +---+----+----+
## |  1|  aa|  ab|
## |  2|  bb|  bc|
## |  3|  00|  00|
## +---+----+----+

The key is unique in two cases and in the real case the DataFrames can have many columns.

How can I write Spark/Scala code to realize these two cases in one function?

Upvotes: 2

Views: 3590

Answers (2)

Alberto Bonsanto
Alberto Bonsanto

Reputation: 18022

The trick is to use a fullOuterJoin and a when condition.

import org.apache.spark.sql.functions._

val dfa = Seq(
  (1, "aa", "ab"),
  (2, "bb", "bb"),
  (3, "cc", "cc")).toDF("key", "val1", "val2")

val dfb = Seq(
  (1, "aa", "ab"),
  (2, "bb", "bb")).toDF("key", "val1", "val2")

val q = dfa
  .join(dfb, Seq("key"), "outer")
  .select($"key",
     when(dfb("val1").isNull, lit(0)).otherwise(dfb("val1")).as("val1"), 
     when(dfb("val2").isNull, lit(0)).otherwise(dfb("val2")).as("val2"))
  .orderBy("key")

scala> q.show
+---+----+----+
|key|val1|val2|
+---+----+----+
|  1|  aa|  ab|
|  2|  bb|  bb|
|  3|   0|   0|
+---+----+----+

Upvotes: 4

Jacek Laskowski
Jacek Laskowski

Reputation: 74669

Just like @summerbulb suggested in the comment you should use na operator to fill the missing values.

Note that I use as operator to have an alias for the columns.

val oldDF = Seq(
  (1, "aa", "ab"),
  (2, "bb", "bb"),
  (3, "cc", "cc")).toDF("key", "val1", "val2")
val newDF = Seq(
  (1, "aa", "ab"),
  (2, "bb", "bc")).toDF("key", "val1", "val2")
val q = oldDF.join(newDF.as("new"), Seq("key"), "outer")
  .select("key", "new.*")
  .na.fill("0")  // <-- na.fill("0") because of String type
  .orderBy("key")

scala> q.show
+---+----+----+
|key|val1|val2|
+---+----+----+
|  1|  aa|  ab|
|  2|  bb|  bc|
|  3|   0|   0|
+---+----+----+

Depending on the type of the column you may want to use 0 as String or Double type.

Read up on as and na in the scaladoc of Dataset.

Upvotes: 1

Related Questions