Mahesh
Mahesh

Reputation: 29

Column wise comparison between Spark Dataframe using Spark core

Given Example, But looking for N number of columns comparison between two data frame as column-wise.

Given sample with 5 rows and 3 columns with EmpID as Primary key.

How can I do this comparison in Spark core?

InputDf1:

|EMPID |Dept     |  Salary
--------------------------
|1     |HR       |   100
|2     |IT       |   200
|3     |Finance  |   250
|4     |Accounts |   200
|5     |IT       |   150

InfputDF2:

|EMPID |Dept       |Salary
------------------------------
|1     |HR         | 100
|2     |IT         | 200
|3     |FIN        | 250
|4     |Accounts   | 150
|5     |IT         | 150

Expected Result DF:

|EMPID   |Dept      |Dept      |status      |Salary     |Salary   |status
--------------------------------------------------------------------
|1       |HR        |HR        | TRUE       | 100       | 100     | TRUE
|2       |IT        |IT        | TRUE       | 200       | 200     | TRUE
|3       |Finance   |FIN       | False      | 250       | 250     | TRUE
|4       |Accounts  |Accounts  | TRUE       | 200       | 150     | FALSE
|5       |IT        |IT        | TRUE       | 150       | 150     | TRUE

Upvotes: 1

Views: 144

Answers (3)

Nikunj Kakadiya
Nikunj Kakadiya

Reputation: 2998

You could also do this in a way below:

//Source data
val df = Seq((1,"HR",100),(2,"IT",200),(3,"Finance",250),(4,"Accounts",200),(5,"IT",150)).toDF("EMPID","Dept","Salary")
val df1 = Seq((1,"HR",100),(2,"IT",200),(3,"Fin",250),(4,"Accounts",150),(5,"IT",150)).toDF("EMPID","Dept","Salary")

//joins and other operations
val finalDF = df.as("d").join(df1.as("d1"),Seq("EMPID"),"inner")
.withColumn("DeptStatus",$"d.Dept" === $"d1.Dept")
.withColumn("Salarystatus",$"d.Salary" === $"d1.Salary")
.selectExpr("EMPID","d.Dept","d1.Dept","DeptStatus as 
Status","d.Salary","d1.Salary","SalaryStatus as Status")
display(finalDF)

You can see the output as below:

enter image description here

Upvotes: 1

blackbishop
blackbishop

Reputation: 32660

You can use join and then iterate over df.columns to select the desired output columns :

val df_final = df1.alias("df1")
  .join(df2.alias("df2"), "EMPID")
  .select(
      Seq(col("EMPID")) ++
      df1.columns.filter(_ != "EMPID")
        .flatMap(c =>
          Seq(
            col(s"df1.$c").as(s"df1_$c"),
            col(s"df2.$c").as(s"df2_$c"),
            (col(s"df1.$c") === col(s"df2.$c")).as(s"status_$c")
          )
      ): _*
)

df_final.show

//+-----+--------+--------+-----------+----------+----------+-------------+
//|EMPID|df1_Dept|df2_Dept|status_Dept|df1_Salary|df2_Salary|status_Salary|
//+-----+--------+--------+-----------+----------+----------+-------------+
//|    1|      HR|      HR|       true|       100|       100|         true|
//|    2|      IT|      IT|       true|       200|       200|         true|
//|    3| Finance|     FIN|      false|       250|       250|         true|
//|    4|Accounts|Accounts|       true|       200|       150|        false|
//|    5|      IT|      IT|       true|       150|       150|         true|
//+-----+--------+--------+-----------+----------+----------+-------------+

Upvotes: 1

mck
mck

Reputation: 42352

You can do a join using the EMPID and compare the resulting columns:

val result = df1.alias("df1").join(
    df2.alias("df2"), "EMPID"
).select(
    $"EMPID",
    $"df1.Dept", $"df2.Dept",
    ($"df1.Dept" === $"df2.Dept").as("status"),
    $"df1.Salary", $"df2.Salary",
    ($"df1.Salary" === $"df2.Salary").as("status")
)

result.show
+-----+--------+--------+------+------+------+------+
|EMPID|    Dept|    Dept|status|Salary|Salary|status|
+-----+--------+--------+------+------+------+------+
|    1|      HR|      HR|  true|   100|   100|  true|
|    2|      IT|      IT|  true|   200|   200|  true|
|    3| Finance|     FIN| false|   250|   250|  true|
|    4|Accounts|Accounts|  true|   200|   150| false|
|    5|      IT|      IT|  true|   150|   150|  true|
+-----+--------+--------+------+------+------+------+

Note that you may wish to rename the columns because duplicate column names are not possible to query in the future.

Upvotes: 2

Related Questions