Ishan
Ishan

Reputation: 1006

Join in spark dataframe (scala) based on not null values

Suppose I've two dataframes like following :

First -

A    | B    | C    | D
1a   | 1b   | 1c   | 1d
2a   | null | 2c   | 2d
3a   | null | null | 3d
4a   | 4b   | null | null
5a   | null | null | null
6a   | 6b   | 6c   | null

Second -

P    | B    | C    | D
1p   | 1b   | 1c   | 1d
2p   | 2b   | 2c   | 2d
3p   | 3b   | 3c   | 3d
4p   | 4b   | 4c   | 4d 
5p   | 5b   | 5c   | 5d
6p   | 6b   | 6c   | 6d 

The join operation is performed based on {"B", "C", "D"}. In case of null occurring in any of these columns, it should check for not null values occurring in remaining columns.

So, the result should be like -

P    | B    | C    | D    | A
1p   | 1b   | 1c   | 1d   | 1a
2p   | null | 2c   | 2d   | 2a
3p   | null | null | 3d   | 3a
4p   | 4b   | null | null | 4a // First(C) & First(D) was null so we take only B
6p   | 6b   | 6c   | null | 6a

Can anyone suggest any solution for this query ? Currently I am trying to filter values having null values in single column, two columns, three columns. Then joining them with Second without taking that column. For eg - I first filtered out values having only B as null from First. Then joining it with Second based on "C" and "D". In this way, I will get many dataframes and I will finally union them.

Upvotes: 4

Views: 4729

Answers (2)

Haroun Mohammedi
Haroun Mohammedi

Reputation: 2424

I think left join should do the work, try the following code :

val group = udf((p1: String, p2: String, p3: String) => if (p1 != null) p1 else if (p2 != null) p2 else if (p3 != null) p3 else null)
val joined = first.join(second.select("B", "P"), Seq("B"), "left")
                  .withColumnRenamed("P", "P1")
                  .join(second.select("C", "P"), Seq("C"), "left")
                  .withColumnRenamed("P", "P2")
                  .join(second.select("D", "P"), Seq("D"), "left")
                  .withColumnRenamed("P", "P3")
                  .select($"A", $"B", $"C", $"D", group($"P1", $"P2", $"P3") as "P")
                  .where($"P".isNotNull) 

Hope this helps you, otherwise comment your problems

Upvotes: 0

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

Here's what you can do

import org.apache.spark.sql.functions._
df1.join(broadcast(df2), df1("B") === df2("B") || df1("C") === df2("C") || df1("D") === df2("D"))
  .drop(df2("B"))
  .drop(df2("C"))
  .drop(df2("D"))
  .show(false)

to be more safe you can broadcast the dataframe which is smaller in size.

Upvotes: 2

Related Questions