ashK
ashK

Reputation: 733

How to join two DataFrames and update missing values with multiple primary keys involved?

Case 1 Merge

Old dataframe:

## +---+----+----+---+
## |pk1|pk2|val1|val2|
## +---+----+----+---+
## |  1|  aa|  ab| ac|
## |  2|  bb|  bc| bd|
## +---+----+----+---+

New dataframe:

## +---+----+----+---+
## |pk1|pk2|val1|val2|
## +---+----+----+---+
## |  1|  aa|  ab| ad|
## |  2|  bb|  bb| bd|
## |  3|  cc|  cc| cc|
## +---+----+----+---+

Result:

## +---+----+----+---+
## |pk1|pk2|val1|val2|
## +---+----+----+---+
## |  1|  aa|  ab| ad|
## |  2|  bb|  bb| bd|
## |  3|  cc|  cc| cc|
## +---+----+----+---+

Does the outer join with multiple keys will work?

Upvotes: 1

Views: 409

Answers (1)

Leo C
Leo C

Reputation: 22449

From your sample data, I take it that elements from the new dataframe will be picked over the old dataframe should they be different.

[UPDATE] With val-columns being dynamic, you can apply foldLeft to the column list as follows:

val dfOld = Seq(
  (1, "aa", "ab", "ac"),
  (2, "bb", "bc", "bd")
).toDF("pk1", "pk2", "val1", "val2")

val dfNew = Seq(
  (1, "aa", "ab", "ad"),
  (2, "bb", "bb", "bd"),
  (3, "cc", "cc", "cc")
).toDF("pk1", "pk2", "val1", "val2")

// Assemble the list of selected val-columns
val valColumns = dfNew.columns.filter(x => x != "pk1" && x != "pk2")

val dfJoined = dfNew.join(dfOld, Seq("pk1", "pk2"), "left_outer")

// Generate diff-columns from the val-column list
val dfDiff = valColumns.foldLeft(dfJoined)( (acc, x ) =>
  acc.withColumn(
    x + "diff",
    when( !(dfNew(x) === dfOld(x)) || (dfOld(x).isNull), dfNew(x) ).otherwise( null )
  ).
  drop(x)
)

dfDiff.show
+---+---+--------+--------+
|pk1|pk2|val1diff|val2diff|
+---+---+--------+--------+
|  1| aa|    null|      ad|
|  2| bb|      bb|    null|
|  3| cc|      cc|      cc|
+---+---+--------+--------+

Upvotes: 1

Related Questions