Aradhana
Aradhana

Reputation: 41

Spark Dataframes join with 2 columns using or operator

Dataframe 1:

+---------+---------+
|login_Id1|login_Id2|
+---------+---------+
|  1234567|  1234568|
|  1234567|     null|
|     null|  1234568|
|  1234567|  1000000|
|  1000000|  1234568|
|  1000000|  1000000|
+---------+---------+

DataFrame 2:

+--------+---------+-----------+
|login_Id|user_name| user_Email|
+--------+---------+-----------+
| 1234567|TestUser1|user1_Email|
| 1234568|TestUser2|user2_Email|
| 1234569|TestUser3|user3_Email|
| 1234570|TestUser4|user4_Email|
+--------+---------+-----------+

Expected Output

+---------+---------+--------+---------+-----------+
|login_Id1|login_Id2|login_Id|user_name| user_Email|
+---------+---------+--------+---------+-----------+
|  1234567|  1234568| 1234567|TestUser1|user1_Email|
|  1234567|     null| 1234567|TestUser1|user1_Email|
|     null|  1234568| 1234568|TestUser2|user2_Email|
|  1234567|  1000000| 1234567|TestUser1|user1_Email|
|  1000000|  1234568| 1234568|TestUser2|user2_Email|
|  1000000|  1000000|    null|     null|       null|
+---------+---------+--------+---------+-----------+

My requirement is I have to join both the dataframes so as to get the additional information for each login Id from DataFrame 2.Either login_Id1 or login_Id2 will have data(in most of the cases).At times both the columns may also have data.In that case I want to use login_Id1 to perform join.When both of the columns doesn't match I want null as result

I followed this link

Join in spark dataframe (scala) based on not null values

I tried with

DataFrame1.join(broadcast(DataFrame2), DataFrame1("login_Id1") === DataFrame2("login_Id") || DataFrame1("login_Id2") === DataFrame2("login_Id") )

The output that I get is

+---------+---------+--------+---------+-----------+
|login_Id1|login_Id2|login_Id|user_name| user_Email|
+---------+---------+--------+---------+-----------+
|  1234567|  1234568| 1234567|TestUser1|user1_Email|
|  1234567|  1234568| 1234568|TestUser2|user2_Email|
|  1234567|     null| 1234567|TestUser1|user1_Email|
|     null|  1234568| 1234568|TestUser2|user2_Email|
|  1234567|  1000000| 1234567|TestUser1|user1_Email|
|  1000000|  1234568| 1234568|TestUser2|user2_Email|
|  1000000|  1000000|    null|     null|       null|
+---------+---------+--------+---------+-----------+

I get the expected behavior when either of the columns have value.When both of them have values,a join is performed with both the columns(Row1,Row3).In this case || doesn't short circuit?

Is there a way I can get the Expected dataframe?

As of now,I have a udf function that checks if login_Id1 has value(returns login_Id1) or login_Id2 has value(returns login_Id2), if both of them have values I am returning loginId1,and add the result of the udf function as another column(Filtered_Login_id) to the DataFrame1.

Dataframe1 after adding FilteredId column with udf

+--------+---------+-----------+
|loginId1|loginId2 | FilteredId|
+--------+---------+-----------+
| 1234567|1234568  |1234567    |
| 1234567|null     |1234567    |
| null   |1234568  |1234568    |
| 1234567|1000000  |1234567    |
| 1000000|1234568  |1000000    |
| 1000000|1000000  |1000000    |
+--------+---------+-----------+

Then I perform join based on FilteredId ===loginId and get the result

DataFrame1.join(broadcast(DataFrame2), DataFrame1("FilteredId") === DataFrame2("login_Id"),"left_outer" )

Is there a better way to achieve this result without udf?just with join(which behaves like short circuit or operator)?

Included the use case pointed out by Leo.My udf approach misses out the use case pointed out by Leo.My exact requirement is if any of the 2 input column values(login_Id1,login_Id2) match with the login_Id of Dataframe2,that loginId data should be fetched.If either of the columns doesn't match it should add null(something like left outer join)

Upvotes: 4

Views: 10252

Answers (3)

Leo C
Leo C

Reputation: 22449

It's unclear to me whether your sample data already covers all scenarios of the login_Id-pairs. If it does, a solution focusing on null checking will suffice; otherwise it would require something slightly more complex (such as your using of UDF).

One approach without relying on UDF is to apply left_outer join on df1 and left_semi join on df2 each with an additional flag column for preference ordering, combine them via union, join df2 for including the non-key columns, and finally eliminate duplicated rows based on flag.

Here's sample code with slightly more generalized sample data:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val df1 = Seq(
  ("1234567", "1234568"),
  ("1234567", null),
  (null, "1234568"),
  ("1234569", "1000000"),
  ("1000000", "1234570"),
  ("1000000", "1000000")
).toDF("login_Id1", "login_Id2")

val df2 = Seq(
  ("1234567", "TestUser1", "user1_Email"),
  ("1234568", "TestUser2", "user2_Email"),
  ("1234569", "TestUser3", "user3_Email"),
  ("1234570", "TestUser4", "user4_Email")
).toDF("login_Id", "user_name", "user_Email")

val dfOuter = df1.join(df2, $"login_Id1" === df2("login_Id"), "left_outer").
  withColumn("flag", when($"login_Id".isNull, lit(9)).otherwise(lit(1))).
  select("login_Id1", "login_Id2", "flag")
// +---------+---------+----+
// |login_Id1|login_Id2|flag|
// +---------+---------+----+
// |  1234567|  1234568|   1|
// |  1234567|     null|   1|
// |     null|  1234568|   9|
// |  1234569|  1000000|   1|
// |  1000000|  1234570|   9|
// |  1000000|  1000000|   9|
// +---------+---------+----+

val dfSemi = df1.join(df2, $"login_Id2" === df2("login_Id"), "left_semi").
  withColumn("flag", lit(2))
// +---------+---------+----+
// |login_Id1|login_Id2|flag|
// +---------+---------+----+
// |  1234567|  1234568|   2|
// |     null|  1234568|   2|
// |  1000000|  1234570|   2|
// +---------+---------+----+

val window = Window.partitionBy("login_Id1", "login_Id2").orderBy("flag")

(dfOuter union dfSemi).
  withColumn("row_num", row_number.over(window)).
  where($"row_num" === 1).
  withColumn("login_Id", when($"flag" === 1, $"login_Id1").
    otherwise(when($"flag" === 2, $"login_Id2"))
  ).
  join(df2, Seq("login_Id"), "left_outer").
  select("login_Id1", "login_Id2", "login_Id", "user_name", "user_Email")
// +---------+---------+--------+---------+-----------+
// |login_Id1|login_Id2|login_Id|user_name| user_Email|
// +---------+---------+--------+---------+-----------+
// |  1000000|  1000000|    null|     null|       null|
// |  1000000|  1234570| 1234570|TestUser4|user4_Email|
// |  1234567|  1234568| 1234567|TestUser1|user1_Email|
// |  1234569|  1000000| 1234569|TestUser3|user3_Email|
// |  1234567|     null| 1234567|TestUser1|user1_Email|
// |     null|  1234568| 1234568|TestUser2|user2_Email|
// +---------+---------+--------+---------+-----------+

Note that you can apply broadcast to df2 like in your existing sample code if it's significantly smaller compared with df1. In case df2 is small enough to be collect-ed, it can be much simplified to something as follows:

val loginIdList = df2.collect.map(r => r.getAs[String](0))

val df1Unmatched = df1.where(
  !$"login_Id1".isin(loginIdList: _*) && !$"login_Id2".isin(loginIdList: _*)
)

(df1 except df1Unmatched).
  join( broadcast(df2), $"login_Id1" === $"login_Id" ||
    ($"login_Id2" === $"login_Id" &&
      ($"login_Id1".isNull || !$"login_Id1".isin(loginIdList: _*))
    )
  ).
  union(
    df1Unmatched.join(df2, $"login_Id2" === $"login_Id", "left_outer")
  )

Upvotes: 1

Tzach Zohar
Tzach Zohar

Reputation: 37842

You can use the coalesce function to create a new value that's either login_Id1 (if it's not null) or login_Id2 (if 1 was null) - and compare that result to login_Id:

import org.apache.spark.sql.functions._
import spark.implicits._

val res = DataFrame1.join(DataFrame2, coalesce($"login_Id1", $"login_Id2") === $"login_Id")

res.show()
+---------+---------+--------+---------+-----------+
|login_Id1|login_Id2|login_Id|user_name| user_Email|
+---------+---------+--------+---------+-----------+
|  1234567|     null| 1234567|TestUser1|user1_Email|
|  1234567|  1234568| 1234567|TestUser1|user1_Email|
|     null|  1234568| 1234568|TestUser2|user2_Email|
+---------+---------+--------+---------+-----------+

Upvotes: 0

Andy Hayden
Andy Hayden

Reputation: 375685

You only want the second column if the first is null, add that condition to your join clause:

@ df1.join(df2, df1("login_Id1") <=> df2("login_Id") || (df1("login_Id1").isNull && df1("login_Id2") <=> df2("login_Id"))).show()
+---------+---------+--------+---------+-----------+
|login_Id1|login_Id2|login_Id|user_name| user_Email|
+---------+---------+--------+---------+-----------+
|  1234567|  1234568| 1234567|TestUser1|user1_Email|
|  1234567|     null| 1234567|TestUser1|user1_Email|
|     null|  1234568| 1234568|TestUser2|user2_Email|
+---------+---------+--------+---------+-----------+

Note: the right hand side finds this row only:

@ df1.join(df2, df1("login_Id1").isNull && df1("login_Id2") <=> df2("login_Id")).show()
+---------+---------+--------+---------+-----------+
|login_Id1|login_Id2|login_Id|user_name| user_Email|
+---------+---------+--------+---------+-----------+
|     null|  1234568| 1234568|TestUser2|user2_Email|
+---------+---------+--------+---------+-----------+

Upvotes: 0

Related Questions