Reputation: 1006
Suppose I've two dataframes like following :
First -
A | B | C | D
1a | 1b | 1c | 1d
2a | null | 2c | 2d
3a | null | null | 3d
4a | 4b | null | null
5a | null | null | null
6a | 6b | 6c | null
Second -
P | B | C | D
1p | 1b | 1c | 1d
2p | 2b | 2c | 2d
3p | 3b | 3c | 3d
4p | 4b | 4c | 4d
5p | 5b | 5c | 5d
6p | 6b | 6c | 6d
The join operation is performed based on {"B", "C", "D"}. In case of null occurring in any of these columns, it should check for not null values occurring in remaining columns.
So, the result should be like -
P | B | C | D | A
1p | 1b | 1c | 1d | 1a
2p | null | 2c | 2d | 2a
3p | null | null | 3d | 3a
4p | 4b | null | null | 4a // First(C) & First(D) was null so we take only B
6p | 6b | 6c | null | 6a
Can anyone suggest any solution for this query ? Currently I am trying to filter values having null values in single column, two columns, three columns. Then joining them with Second without taking that column. For eg - I first filtered out values having only B as null from First. Then joining it with Second based on "C" and "D". In this way, I will get many dataframes and I will finally union them.
Upvotes: 4
Views: 4729
Reputation: 2424
I think left join should do the work, try the following code :
val group = udf((p1: String, p2: String, p3: String) => if (p1 != null) p1 else if (p2 != null) p2 else if (p3 != null) p3 else null)
val joined = first.join(second.select("B", "P"), Seq("B"), "left")
.withColumnRenamed("P", "P1")
.join(second.select("C", "P"), Seq("C"), "left")
.withColumnRenamed("P", "P2")
.join(second.select("D", "P"), Seq("D"), "left")
.withColumnRenamed("P", "P3")
.select($"A", $"B", $"C", $"D", group($"P1", $"P2", $"P3") as "P")
.where($"P".isNotNull)
Hope this helps you, otherwise comment your problems
Upvotes: 0
Reputation: 41957
Here's what you can do
import org.apache.spark.sql.functions._
df1.join(broadcast(df2), df1("B") === df2("B") || df1("C") === df2("C") || df1("D") === df2("D"))
.drop(df2("B"))
.drop(df2("C"))
.drop(df2("D"))
.show(false)
to be more safe you can broadcast
the dataframe
which is smaller in size.
Upvotes: 2