Reputation: 41
Dataframe 1:
+---------+---------+
|login_Id1|login_Id2|
+---------+---------+
| 1234567| 1234568|
| 1234567| null|
| null| 1234568|
| 1234567| 1000000|
| 1000000| 1234568|
| 1000000| 1000000|
+---------+---------+
DataFrame 2:
+--------+---------+-----------+
|login_Id|user_name| user_Email|
+--------+---------+-----------+
| 1234567|TestUser1|user1_Email|
| 1234568|TestUser2|user2_Email|
| 1234569|TestUser3|user3_Email|
| 1234570|TestUser4|user4_Email|
+--------+---------+-----------+
Expected Output
+---------+---------+--------+---------+-----------+
|login_Id1|login_Id2|login_Id|user_name| user_Email|
+---------+---------+--------+---------+-----------+
| 1234567| 1234568| 1234567|TestUser1|user1_Email|
| 1234567| null| 1234567|TestUser1|user1_Email|
| null| 1234568| 1234568|TestUser2|user2_Email|
| 1234567| 1000000| 1234567|TestUser1|user1_Email|
| 1000000| 1234568| 1234568|TestUser2|user2_Email|
| 1000000| 1000000| null| null| null|
+---------+---------+--------+---------+-----------+
My requirement is I have to join both the dataframes so as to get the additional information for each login Id from DataFrame 2.Either login_Id1 or login_Id2 will have data(in most of the cases).At times both the columns may also have data.In that case I want to use login_Id1 to perform join.When both of the columns doesn't match I want null as result
I followed this link
Join in spark dataframe (scala) based on not null values
I tried with
DataFrame1.join(broadcast(DataFrame2), DataFrame1("login_Id1") === DataFrame2("login_Id") || DataFrame1("login_Id2") === DataFrame2("login_Id") )
The output that I get is
+---------+---------+--------+---------+-----------+
|login_Id1|login_Id2|login_Id|user_name| user_Email|
+---------+---------+--------+---------+-----------+
| 1234567| 1234568| 1234567|TestUser1|user1_Email|
| 1234567| 1234568| 1234568|TestUser2|user2_Email|
| 1234567| null| 1234567|TestUser1|user1_Email|
| null| 1234568| 1234568|TestUser2|user2_Email|
| 1234567| 1000000| 1234567|TestUser1|user1_Email|
| 1000000| 1234568| 1234568|TestUser2|user2_Email|
| 1000000| 1000000| null| null| null|
+---------+---------+--------+---------+-----------+
I get the expected behavior when either of the columns have value.When both of them have values,a join is performed with both the columns(Row1,Row3).In this case || doesn't short circuit?
Is there a way I can get the Expected dataframe?
As of now,I have a udf function that checks if login_Id1 has value(returns login_Id1) or login_Id2 has value(returns login_Id2), if both of them have values I am returning loginId1,and add the result of the udf function as another column(Filtered_Login_id) to the DataFrame1.
Dataframe1 after adding FilteredId column with udf
+--------+---------+-----------+
|loginId1|loginId2 | FilteredId|
+--------+---------+-----------+
| 1234567|1234568 |1234567 |
| 1234567|null |1234567 |
| null |1234568 |1234568 |
| 1234567|1000000 |1234567 |
| 1000000|1234568 |1000000 |
| 1000000|1000000 |1000000 |
+--------+---------+-----------+
Then I perform join based on FilteredId ===loginId and get the result
DataFrame1.join(broadcast(DataFrame2), DataFrame1("FilteredId") === DataFrame2("login_Id"),"left_outer" )
Is there a better way to achieve this result without udf?just with join(which behaves like short circuit or operator)?
Included the use case pointed out by Leo.My udf approach misses out the use case pointed out by Leo.My exact requirement is if any of the 2 input column values(login_Id1,login_Id2) match with the login_Id of Dataframe2,that loginId data should be fetched.If either of the columns doesn't match it should add null(something like left outer join)
Upvotes: 4
Views: 10252
Reputation: 22449
It's unclear to me whether your sample data already covers all scenarios of the login_Id
-pairs. If it does, a solution focusing on null
checking will suffice; otherwise it would require something slightly more complex (such as your using of UDF
).
One approach without relying on UDF
is to apply left_outer
join on df1
and left_semi
join on df2
each with an additional flag
column for preference ordering, combine them via union
, join df2
for including the non-key columns, and finally eliminate duplicated rows based on flag
.
Here's sample code with slightly more generalized sample data:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val df1 = Seq(
("1234567", "1234568"),
("1234567", null),
(null, "1234568"),
("1234569", "1000000"),
("1000000", "1234570"),
("1000000", "1000000")
).toDF("login_Id1", "login_Id2")
val df2 = Seq(
("1234567", "TestUser1", "user1_Email"),
("1234568", "TestUser2", "user2_Email"),
("1234569", "TestUser3", "user3_Email"),
("1234570", "TestUser4", "user4_Email")
).toDF("login_Id", "user_name", "user_Email")
val dfOuter = df1.join(df2, $"login_Id1" === df2("login_Id"), "left_outer").
withColumn("flag", when($"login_Id".isNull, lit(9)).otherwise(lit(1))).
select("login_Id1", "login_Id2", "flag")
// +---------+---------+----+
// |login_Id1|login_Id2|flag|
// +---------+---------+----+
// | 1234567| 1234568| 1|
// | 1234567| null| 1|
// | null| 1234568| 9|
// | 1234569| 1000000| 1|
// | 1000000| 1234570| 9|
// | 1000000| 1000000| 9|
// +---------+---------+----+
val dfSemi = df1.join(df2, $"login_Id2" === df2("login_Id"), "left_semi").
withColumn("flag", lit(2))
// +---------+---------+----+
// |login_Id1|login_Id2|flag|
// +---------+---------+----+
// | 1234567| 1234568| 2|
// | null| 1234568| 2|
// | 1000000| 1234570| 2|
// +---------+---------+----+
val window = Window.partitionBy("login_Id1", "login_Id2").orderBy("flag")
(dfOuter union dfSemi).
withColumn("row_num", row_number.over(window)).
where($"row_num" === 1).
withColumn("login_Id", when($"flag" === 1, $"login_Id1").
otherwise(when($"flag" === 2, $"login_Id2"))
).
join(df2, Seq("login_Id"), "left_outer").
select("login_Id1", "login_Id2", "login_Id", "user_name", "user_Email")
// +---------+---------+--------+---------+-----------+
// |login_Id1|login_Id2|login_Id|user_name| user_Email|
// +---------+---------+--------+---------+-----------+
// | 1000000| 1000000| null| null| null|
// | 1000000| 1234570| 1234570|TestUser4|user4_Email|
// | 1234567| 1234568| 1234567|TestUser1|user1_Email|
// | 1234569| 1000000| 1234569|TestUser3|user3_Email|
// | 1234567| null| 1234567|TestUser1|user1_Email|
// | null| 1234568| 1234568|TestUser2|user2_Email|
// +---------+---------+--------+---------+-----------+
Note that you can apply broadcast
to df2
like in your existing sample code if it's significantly smaller compared with df1
. In case df2
is small enough to be collect
-ed, it can be much simplified to something as follows:
val loginIdList = df2.collect.map(r => r.getAs[String](0))
val df1Unmatched = df1.where(
!$"login_Id1".isin(loginIdList: _*) && !$"login_Id2".isin(loginIdList: _*)
)
(df1 except df1Unmatched).
join( broadcast(df2), $"login_Id1" === $"login_Id" ||
($"login_Id2" === $"login_Id" &&
($"login_Id1".isNull || !$"login_Id1".isin(loginIdList: _*))
)
).
union(
df1Unmatched.join(df2, $"login_Id2" === $"login_Id", "left_outer")
)
Upvotes: 1
Reputation: 37842
You can use the coalesce
function to create a new value that's either login_Id1
(if it's not null) or login_Id2
(if 1 was null) - and compare that result to login_Id
:
import org.apache.spark.sql.functions._
import spark.implicits._
val res = DataFrame1.join(DataFrame2, coalesce($"login_Id1", $"login_Id2") === $"login_Id")
res.show()
+---------+---------+--------+---------+-----------+
|login_Id1|login_Id2|login_Id|user_name| user_Email|
+---------+---------+--------+---------+-----------+
| 1234567| null| 1234567|TestUser1|user1_Email|
| 1234567| 1234568| 1234567|TestUser1|user1_Email|
| null| 1234568| 1234568|TestUser2|user2_Email|
+---------+---------+--------+---------+-----------+
Upvotes: 0
Reputation: 375685
You only want the second column if the first is null, add that condition to your join clause:
@ df1.join(df2, df1("login_Id1") <=> df2("login_Id") || (df1("login_Id1").isNull && df1("login_Id2") <=> df2("login_Id"))).show()
+---------+---------+--------+---------+-----------+
|login_Id1|login_Id2|login_Id|user_name| user_Email|
+---------+---------+--------+---------+-----------+
| 1234567| 1234568| 1234567|TestUser1|user1_Email|
| 1234567| null| 1234567|TestUser1|user1_Email|
| null| 1234568| 1234568|TestUser2|user2_Email|
+---------+---------+--------+---------+-----------+
Note: the right hand side finds this row only:
@ df1.join(df2, df1("login_Id1").isNull && df1("login_Id2") <=> df2("login_Id")).show()
+---------+---------+--------+---------+-----------+
|login_Id1|login_Id2|login_Id|user_name| user_Email|
+---------+---------+--------+---------+-----------+
| null| 1234568| 1234568|TestUser2|user2_Email|
+---------+---------+--------+---------+-----------+
Upvotes: 0