Teju Priya
Teju Priya

Reputation: 665

Compare two dataframes and update the values

I have two dataframes like following.

val file1 = spark.read.format("csv").option("sep", ",").option("inferSchema", "true").option("header", "true").load("file1.csv")
file1.show()
+---+-------+-----+-----+-------+
| id|   name|mark1|mark2|version|
+---+-------+-----+-----+-------+
|  1| Priya |   80|   99|      0|
|  2| Teju  |   10|    5|      0|
+---+-------+-----+-----+-------+
val file2 = spark.read.format("csv").option("sep", ",").option("inferSchema", "true").option("header", "true").load("file2.csv")
file2.show()
+---+-------+-----+-----+-------+
| id|   name|mark1|mark2|version|
+---+-------+-----+-----+-------+
|  1| Priya |   80|   99|      0|
|  2| Teju  |   70|    5|      0|
+---+-------+-----+-----+-------+

Now I am comparing two dataframes and filtering out the mismatch values like this.

val columns = file1.schema.fields.map(_.name)
val selectiveDifferences = columns.map(col => file1.select(col).except(file2.select(col)))
selectiveDifferences.map(diff => {if(diff.count > 0) diff.show})
+-----+
|mark1|
+-----+
|   10|
+-----+

I need to add the extra row into the dataframe, 1 for the mismatch value from the dataframe 2 and update the version number like this.

file1.show()
+---+-------+-----+-----+-------+
| id|   name|mark1|mark2|version|
+---+-------+-----+-----+-------+
|  1| Priya |   80|   99|      0|
|  2| Teju  |   10|    5|      0|
|  3| Teju  |   70|    5|      1|
+---+-------+-----+-----+-------+

I am struggling to achieve the above step and it is my expected output. Any help would be appreciated.

Upvotes: 4

Views: 1613

Answers (1)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

You can get your final dataframe by using except and union as following

val count = file1.count()

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
file1.union(file2.except(file1)
                  .withColumn("version", lit(1))      //changing the version
                  .withColumn("id", (row_number.over(Window.orderBy("id")))+lit(count))   //changing the id number
  )

lit, row_number and window functions are used to generate the id and versions

Note : use of window function to generate the new id makes the process inefficient as all the data would be collected in one executor for generating new id

Upvotes: 4

Related Questions