Reputation: 63
I use Spark 2.0 and want to update/merge row values in a DataFrame.
I have two DataFrames (old and new ones) which I want to merge in a way that when the old DataFrame has more rows than the new DataFrame, set the old data value 0.
Old dataframe:
## +---+----+----+
## |key|val1|val2|
## +---+----+----+
## | 1| aa| ab|
## | 2| bb| bc|
## +---+----+----+
New dataframe:
## +---+----+----+
## |key|val1|val2|
## +---+----+----+
## | 1| aa| ab|
## | 2| bb| bb|
## | 3| cc| cc|
## +---+----+----+
Result:
## +---+----+----+
## |key|val1|val2|
## +---+----+----+
## | 1| aa| ab|
## | 2| bb| bb|
## | 3| cc| cc|
## +---+----+----+
Old dataframe:
## +---+----+----+
## |key|val1|val2|
## +---+----+----+
## | 1| aa| ab|
## | 2| bb| bb|
## | 3| cc| cc|
## +---+----+----+
New dataframe:
## +---+----+----+
## |key|val1|val2|
## +---+----+----+
## | 1| aa| ab|
## | 2| bb| bc|
## +---+----+----+
Result:
## +---+----+----+
## |key|val1|val2|
## +---+----+----+
## | 1| aa| ab|
## | 2| bb| bc|
## | 3| 00| 00|
## +---+----+----+
The key is unique in two cases and in the real case the DataFrames can have many columns.
How can I write Spark/Scala code to realize these two cases in one function?
Upvotes: 2
Views: 3590
Reputation: 18022
The trick is to use a fullOuterJoin
and a when
condition.
import org.apache.spark.sql.functions._
val dfa = Seq(
(1, "aa", "ab"),
(2, "bb", "bb"),
(3, "cc", "cc")).toDF("key", "val1", "val2")
val dfb = Seq(
(1, "aa", "ab"),
(2, "bb", "bb")).toDF("key", "val1", "val2")
val q = dfa
.join(dfb, Seq("key"), "outer")
.select($"key",
when(dfb("val1").isNull, lit(0)).otherwise(dfb("val1")).as("val1"),
when(dfb("val2").isNull, lit(0)).otherwise(dfb("val2")).as("val2"))
.orderBy("key")
scala> q.show
+---+----+----+
|key|val1|val2|
+---+----+----+
| 1| aa| ab|
| 2| bb| bb|
| 3| 0| 0|
+---+----+----+
Upvotes: 4
Reputation: 74669
Just like @summerbulb suggested in the comment you should use na
operator to fill the missing values.
Note that I use as
operator to have an alias for the columns.
val oldDF = Seq(
(1, "aa", "ab"),
(2, "bb", "bb"),
(3, "cc", "cc")).toDF("key", "val1", "val2")
val newDF = Seq(
(1, "aa", "ab"),
(2, "bb", "bc")).toDF("key", "val1", "val2")
val q = oldDF.join(newDF.as("new"), Seq("key"), "outer")
.select("key", "new.*")
.na.fill("0") // <-- na.fill("0") because of String type
.orderBy("key")
scala> q.show
+---+----+----+
|key|val1|val2|
+---+----+----+
| 1| aa| ab|
| 2| bb| bc|
| 3| 0| 0|
+---+----+----+
Depending on the type of the column you may want to use 0
as String
or Double
type.
Read up on as
and na
in the scaladoc of Dataset.
Upvotes: 1