Reputation: 29
Given Example, But looking for N
number of columns comparison between two data frame as column-wise.
Given sample with 5 rows and 3 columns with EmpID
as Primary key.
How can I do this comparison in Spark core?
InputDf1:
|EMPID |Dept | Salary
--------------------------
|1 |HR | 100
|2 |IT | 200
|3 |Finance | 250
|4 |Accounts | 200
|5 |IT | 150
InfputDF2:
|EMPID |Dept |Salary
------------------------------
|1 |HR | 100
|2 |IT | 200
|3 |FIN | 250
|4 |Accounts | 150
|5 |IT | 150
Expected Result DF:
|EMPID |Dept |Dept |status |Salary |Salary |status
--------------------------------------------------------------------
|1 |HR |HR | TRUE | 100 | 100 | TRUE
|2 |IT |IT | TRUE | 200 | 200 | TRUE
|3 |Finance |FIN | False | 250 | 250 | TRUE
|4 |Accounts |Accounts | TRUE | 200 | 150 | FALSE
|5 |IT |IT | TRUE | 150 | 150 | TRUE
Upvotes: 1
Views: 144
Reputation: 2998
You could also do this in a way below:
//Source data
val df = Seq((1,"HR",100),(2,"IT",200),(3,"Finance",250),(4,"Accounts",200),(5,"IT",150)).toDF("EMPID","Dept","Salary")
val df1 = Seq((1,"HR",100),(2,"IT",200),(3,"Fin",250),(4,"Accounts",150),(5,"IT",150)).toDF("EMPID","Dept","Salary")
//joins and other operations
val finalDF = df.as("d").join(df1.as("d1"),Seq("EMPID"),"inner")
.withColumn("DeptStatus",$"d.Dept" === $"d1.Dept")
.withColumn("Salarystatus",$"d.Salary" === $"d1.Salary")
.selectExpr("EMPID","d.Dept","d1.Dept","DeptStatus as
Status","d.Salary","d1.Salary","SalaryStatus as Status")
display(finalDF)
You can see the output as below:
Upvotes: 1
Reputation: 32660
You can use join and then iterate over df.columns
to select the desired output columns :
val df_final = df1.alias("df1")
.join(df2.alias("df2"), "EMPID")
.select(
Seq(col("EMPID")) ++
df1.columns.filter(_ != "EMPID")
.flatMap(c =>
Seq(
col(s"df1.$c").as(s"df1_$c"),
col(s"df2.$c").as(s"df2_$c"),
(col(s"df1.$c") === col(s"df2.$c")).as(s"status_$c")
)
): _*
)
df_final.show
//+-----+--------+--------+-----------+----------+----------+-------------+
//|EMPID|df1_Dept|df2_Dept|status_Dept|df1_Salary|df2_Salary|status_Salary|
//+-----+--------+--------+-----------+----------+----------+-------------+
//| 1| HR| HR| true| 100| 100| true|
//| 2| IT| IT| true| 200| 200| true|
//| 3| Finance| FIN| false| 250| 250| true|
//| 4|Accounts|Accounts| true| 200| 150| false|
//| 5| IT| IT| true| 150| 150| true|
//+-----+--------+--------+-----------+----------+----------+-------------+
Upvotes: 1
Reputation: 42352
You can do a join using the EMPID and compare the resulting columns:
val result = df1.alias("df1").join(
df2.alias("df2"), "EMPID"
).select(
$"EMPID",
$"df1.Dept", $"df2.Dept",
($"df1.Dept" === $"df2.Dept").as("status"),
$"df1.Salary", $"df2.Salary",
($"df1.Salary" === $"df2.Salary").as("status")
)
result.show
+-----+--------+--------+------+------+------+------+
|EMPID| Dept| Dept|status|Salary|Salary|status|
+-----+--------+--------+------+------+------+------+
| 1| HR| HR| true| 100| 100| true|
| 2| IT| IT| true| 200| 200| true|
| 3| Finance| FIN| false| 250| 250| true|
| 4|Accounts|Accounts| true| 200| 150| false|
| 5| IT| IT| true| 150| 150| true|
+-----+--------+--------+------+------+------+------+
Note that you may wish to rename the columns because duplicate column names are not possible to query in the future.
Upvotes: 2