Deepak Kumar
Deepak Kumar

Reputation: 17

Perform merge/insert on two spark dataframes with different schemas?

I have spark dataframe df and df1 both with different schemas.

DF:-

val DF  = Seq(("1","acv","34","a","1"),("2","fbg","56","b","3"),("3","rty","78","c","5")).toDF("id","name","age","DBName","test")

+---+----+---+------+----+
| id|name|age|DBName|test|
+---+----+---+------+----+
|  1| acv| 34|     a|   1|
|  2| fbg| 56|     b|   3|
|  3| rty| 78|     c|   5|
+---+----+---+------+----+

DF1:-

val DF1= Seq(("1","gbj","67","a","5"),("2","gbj","67","a","7"),("2","jku","88","b","8"),("4","jku","88","b",7"),("5","uuu","12","c","9")).toDF("id","name","age","DBName","col1")
    
+---+----+---+------+----+
| id|name|age|DBName|col1|
+---+----+---+------+----+
|  1| gbj| 67|     a|   5|
|  2| gbj| 67|     a|   7|
|  2| jku| 88|     b|   8|
|  4| jku| 88|     b|   7|
|  5| uuu| 12|     c|   9|
+---+----+---+------+----+

I want to merge DF1 with DF based on value of id and DBName. So if my id and DBName already exists in DF then the record should be updated and if id and DBName doesn't exist then the new record should be added. So the resulting data frame should be like this:

    +---+----+---+------+----+----+
    | id|name|age|DBName|Test|col |
    +---+----+---+------+----+----+
    |  5| uuu| 12|     c|NULL|9   |
    |  2| jku| 88|     b|NULL|8   |
    |  4| jku| 88|     b|NULL|7   |
    |  1| gbj| 67|     a|NULL|5   |  
    |  3| rty| 78|     c|5   |NULL|
    |  2| gbj| 67|     a|NULL|7   |
    +---+----+---+------+----+----+

I have tried so far

val updatedDF = DF.as("a").join(DF1.as("b"), $"a.id" === $"b.id" &&  $"a.DBName" === $"b.DBName", "outer").select(DF.columns.map(c => coalesce($"b.$c", $"b.$c") as c): _*)

Error:-

org.apache.spark.sql.AnalysisException: cannot resolve '`b.test`' given input columns: [b.DBName, a.DBName, a.name, b.age, a.id, a.age, b.id, a.test, b.name];;

Upvotes: 0

Views: 221

Answers (1)

mck
mck

Reputation: 42422

You're selecting non-existent columns, and also there is a typo in the coalesce. You can follow the example below to fix your issue:

val updatedDF = DF.as("a").join(
    DF1.as("b"), 
    $"a.id" === $"b.id" &&  $"a.DBName" === $"b.DBName", 
    "outer"
).select(
    DF.columns.dropRight(1).map(c => coalesce($"b.$c", $"a.$c") as c) 
    :+ col(DF.columns.last) 
    :+ col(DF1.columns.last)
    :_*
)

updatedDF.show
+---+----+---+------+----+----+
| id|name|age|DBName|test|col1|
+---+----+---+------+----+----+
|  5| uuu| 12|     c|null|   9|
|  2| jku| 88|     b|   3|   8|
|  4| jku| 88|     b|null|   7|
|  1| gbj| 67|     a|   1|   5|
|  3| rty| 78|     c|   5|null|
|  2| gbj| 67|     a|null|   7|
+---+----+---+------+----+----+

Upvotes: 1

Related Questions