Prateek Pathak
Prateek Pathak

Reputation: 111

Spark dataframe inner join without duplicate match

I want to join two dataframes based on certain condition is spark scala. However the catch is if row in df1 matches any row in df2, it should not try to match same row of df1 with any other row in df2. Below is sample data and outcome I am trying to get.

   DF1
--------------------------------
Emp_id | Emp_Name | Address_id
1      |  ABC     |   1
2      |  DEF     |   2
3      |  PQR     |   3
4      |  XYZ     |   1

   DF2
-----------------------
Address_id | City 
1          | City_1
1          | City_2
2          | City_3
REST       | Some_City

  Output DF
----------------------------------------
Emp_id | Emp_Name | Address_id | City
1      |  ABC     |   1        | City_1
2      |  DEF     |   2        | City_3
3      |  PQR     |   3        | Some_City
4      |  XYZ     |   1        | City_1 

Note:- REST is like wild card. Any value can be equal to REST.

So in above sample emp_name "ABC" can match with City_1, City_2 or Some_City. Output DF contains only City_1 because it finds it first.

Upvotes: 1

Views: 1284

Answers (2)

mvasyliv
mvasyliv

Reputation: 1214

{    
    import org.apache.spark.sql.{SparkSession}
    import org.apache.spark.sql.functions._

    object JoinTwoDataFrame extends App {

      val spark = SparkSession.builder()
        .master("local")
        .appName("DataFrame-example")
        .getOrCreate()

      import spark.implicits._

      val df1 = Seq(
        (1, "ABC", "1"),
        (2, "DEF", "2"),
        (3, "PQR", "3"),
        (4, "XYZ", "1")
      ).toDF("Emp_id", "Emp_Name", "Address_id")

      val df2 = Seq(
        ("1", "City_1"),
        ("1", "City_2"),
        ("2", "City_3"),
        ("REST", "Some_City")
      ).toDF("Address_id", "City")

      val restCity: Option[String] = Some(df2.filter('Address_id.equalTo("REST")).select('City).first()(0).toString)

      val res = df1.join(df2, df1.col("Address_id") === df2.col("Address_id") , "left_outer")
        .select(
          df1.col("Emp_id"),
          df1.col("Emp_Name"),
          df1.col("Address_id"),
          df2.col("City")
        )
          .withColumn("city2", when('City.isNotNull, 'City).otherwise(restCity.getOrElse("")))
          .drop("City")
          .withColumnRenamed("city2", "City")
          .orderBy("Address_id", "City")
          .groupBy("Emp_id", "Emp_Name", "Address_id")
          .agg(collect_list("City").alias("cityList"))
          .withColumn("City", 'cityList.getItem(0))
          .drop("cityList")
          .orderBy("Emp_id")

            res.show(false)

    //  +------+--------+----------+---------+
    //  |Emp_id|Emp_Name|Address_id|City     |
    //  +------+--------+----------+---------+
    //  |1     |ABC     |1         |City_1   |
    //  |2     |DEF     |2         |City_3   |
    //  |3     |PQR     |3         |Some_City|
    //  |4     |XYZ     |1         |City_1   |
    //  +------+--------+----------+---------+

    }
}

Upvotes: 0

Kamal Kunjapur
Kamal Kunjapur

Reputation: 8860

You seem to have a custom logic for your join. Basically I've been to come up with the below UDF.

Note that you may want to change the logic for the UDF as per your requirement.

import spark.implicits._
import org.apache.spark.sql.functions.to_timestamp
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.first

//dataframe 1    
val df_1 = Seq(("1", "ABC", "1"), ("2", "DEF", "2"), ("3", "PQR", "3"), ("4", "XYZ", "1")).toDF("Emp_Id", "Emp_Name", "Address_Id")

//dataframe 2
val df_2 = Seq(("1", "City_1"), ("1", "City_2"), ("2", "City_3"), ("REST","Some_City")).toDF("Address_Id", "City_Name")

// UDF logic
val join_udf = udf((a: String, b: String) => {
      (a,b) match {
        case ("1", "1") => true
        case ("1", _) => false
        case ("2", "2") => true
        case ("2", _) => false
        case(_, "REST") => true
        case(_, _) => false

    }})

val dataframe_join = df_1.join(df_2, join_udf(df_1("Address_Id"), df_2("Address_Id")), "inner").drop(df_2("Address_Id"))
                             .orderBy($"City_Name")
                             .groupBy($"Emp_Id", $"Emp_Name", $"Address_Id")
                             .agg(first($"City_Name"))
                             .orderBy($"Emp_Id")

dataframe_join.show(false)

Basically post applying UDF, what you get is all possible combinations of the matches.

Post that when you apply groupBy and make use of first function of agg, you would only get the filtered values as what you are looking for.

+------+--------+----------+-----------------------+
|Emp_Id|Emp_Name|Address_Id|first(City_Name, false)|
+------+--------+----------+-----------------------+
|1     |ABC     |1         |City_1                 |
|2     |DEF     |2         |City_3                 |
|3     |PQR     |3         |Some_City              |
|4     |XYZ     |1         |City_1                 |
+------+--------+----------+-----------------------+

Note that I've made use of Spark 2.3 and hope this helps!

Upvotes: 1

Related Questions