Reputation: 3024
I am creating a function which takes join keys and condition as parameters and dynamically joins two dataframes.
I understand Spark Scala Dataframe join done the following ways:
1) join(right: Dataset[_]): DataFrame
2) join(right: Dataset[_], usingColumn: String): DataFrame
3) join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
4) join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame
5) join(right: Dataset[_], joinExprs: Column): DataFrame
6) join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame
Join keys/usingColumns parameters will be a list of column names.
condition/joinExprs - not sure how to pass it, but it can be a string like "df2(colname) == 'xyz'"
Based on this post, I came up with the below. It takes care of join keys list, but how can I add the conditon as well? (note: I used identical dataframes here for simplicity)
%scala
val emp = Seq((1,"Smith",-1,"2018","10","M",3000),
(2,"Rose",1,"2010","20","M",4000),
)
val empColumns = Seq("emp_id","name","superior_emp_id","year_joined","dept_id","gender","salary")
import spark.sqlContext.implicits._
val empDF = emp.toDF(empColumns:_*)
val empDF2 = emp.toDF(empColumns:_*)
val join_keys = Seq("emp_id","name") // this will be a parameter
val joinExprs = join_keys.map{case (c1) => empDF(c1) === empDF2(c1)}.reduce(_ && _)
// How do I add to joinExprs, another joinExpr like "empDF2(dept_id) == 10" here?
empDF.join(empDF2,joinExprs,"inner").show(false)
Upvotes: 0
Views: 1940
Reputation: 27383
You can just append to joinExprs
with &&
:
empDF.join(empDF2,joinExprs && empDF2("dept_id") === 10,"inner").show(false)
Upvotes: 2