Gadam
Gadam

Reputation: 3024

Spark Scala dataframe join dynamically using list of columns and joinExprs

I am creating a function which takes join keys and condition as parameters and dynamically joins two dataframes.

I understand Spark Scala Dataframe join done the following ways:

1) join(right: Dataset[_]): DataFrame
2) join(right: Dataset[_], usingColumn: String): DataFrame
3) join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
4) join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame
5) join(right: Dataset[_], joinExprs: Column): DataFrame
6) join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame

Join keys/usingColumns parameters will be a list of column names. condition/joinExprs - not sure how to pass it, but it can be a string like "df2(colname) == 'xyz'"

Based on this post, I came up with the below. It takes care of join keys list, but how can I add the conditon as well? (note: I used identical dataframes here for simplicity)

 %scala
  val emp = Seq((1,"Smith",-1,"2018","10","M",3000),
    (2,"Rose",1,"2010","20","M",4000),
  )
  val empColumns = Seq("emp_id","name","superior_emp_id","year_joined","dept_id","gender","salary")
  import spark.sqlContext.implicits._
  val empDF = emp.toDF(empColumns:_*) 
  val empDF2 = emp.toDF(empColumns:_*) 


val join_keys = Seq("emp_id","name") // this will be a parameter
val joinExprs = join_keys.map{case (c1) => empDF(c1) === empDF2(c1)}.reduce(_ && _) 

// How do I add to joinExprs, another joinExpr like "empDF2(dept_id) == 10" here?

empDF.join(empDF2,joinExprs,"inner").show(false)

Upvotes: 0

Views: 1940

Answers (1)

Raphael Roth
Raphael Roth

Reputation: 27383

You can just append to joinExprs with &&:

empDF.join(empDF2,joinExprs && empDF2("dept_id") === 10,"inner").show(false)

Upvotes: 2

Related Questions