Shankar
Shankar

Reputation: 8967

Spark SQL Dataframe API -build filter condition dynamically

I have two Spark dataframe's, df1 and df2:

+-------+-----+---+
|   name|empNo|age|
+-------+-----+---+
|shankar|12121| 28|
| ramesh| 1212| 29|
| suresh| 1111| 30|
| aarush| 0707| 15|
+-------+-----+---+
+------+-----+---+-----+
| eName|  eNo|age| city|
+------+-----+---+-----+
|aarush|12121| 15|malmo|
|ramesh| 1212| 29|malmo|
+------+-----+---+-----+

I need to get the non matching records from df1, based on a number of columns which is specified in another file.

For example, the column look up file is something like below:

df1col,df2col
name,eName
empNo, eNo

Expected output is:

+-------+-----+---+
|   name|empNo|age|
+-------+-----+---+
|shankar|12121| 28|
| suresh| 1111| 30|
| aarush| 0707| 15|
+-------+-----+---+

The idea is how to build a where condition dynamically for the above scenario, because the lookup file is configurable, so it might have 1 to n fields.

Upvotes: 0

Views: 775

Answers (2)

Shaido
Shaido

Reputation: 28392

You can use the except dataframe method. I'm assuming that the columns to use are in two lists for simplicity. It's necessary that the order of both lists are correct, the columns on the same location in the list will be compared (regardless of column name). After except, use join to get the missing columns from the first dataframe.

val df1 = Seq(("shankar","12121",28),("ramesh","1212",29),("suresh","1111",30),("aarush","0707",15))
  .toDF("name", "empNo", "age")
val df2 = Seq(("aarush", "12121", 15, "malmo"),("ramesh", "1212", 29, "malmo"))
  .toDF("eName", "eNo", "age", "city")

val df1Cols = List("name", "empNo")
val df2Cols = List("eName", "eNo")

val tempDf = df1.select(df1Cols.head, df1Cols.tail: _*)
  .except(df2.select(df2Cols.head, df2Cols.tail: _*))    
val df = df1.join(broadcast(tempDf), df1Cols)

The resulting dataframe will look as wanted:

+-------+-----+---+
|   name|empNo|age|
+-------+-----+---+
| aarush| 0707| 15|
| suresh| 1111| 30|
|shankar|12121| 28|
+-------+-----+---+

Upvotes: 3

Ameer
Ameer

Reputation: 2638

If you're doing this from a SQL query I would remap the column names in the SQL query itself with something like Changing a SQL column title via query. You could do a simple text replace in the query to normalize them to the df1 or df2 column names.

Once you have that you can diff using something like How to obtain the difference between two DataFrames?

If you need more columns that wouldn't be used in the diff (e.g. age) you can reselect the data again based on your diff results. This may not be the optimal way of doing it but it would probably work.

Upvotes: 0

Related Questions