Saqib Ali
Saqib Ali

Reputation: 4428

Find all rows in a dataframe that meet a certain criteria in another dataframe

I have two dataframes as follows:

df1 (reference data)

Tempe, AZ, USA
San Jose, CA, USA
Mountain View, CA, USA
New York, NY, USA

df2 (User entered data)

Tempe, AZ
Tempe, Arizona
San Jose, USA
San Jose, CA
Mountain View, CA

I would like to get a dataframe (df3) as following:

-------------------------------------------
|Tempe, AZ, USA        | Tempe, Arizona   |
|Tempe, AZ, USA        | Tempe, AZ        |
|San Jose, CA, USA     | San Jose, CA     |
|San Jose, CA, USA     | San Jose, USA    |
|Mountain View, CA, USA| Mountain View, CA|
-------------------------------------------

I already a User Defined Function :

isSameAs(str1: String, str2:String): Boolean{
    ......
} 

that take two strings (user entered data and reference data) and tells me if they are a match or not.

I just need to find out the right way to implement map in Scala Spark SQL so that I get the the dataframe like df3.

Upvotes: 1

Views: 244

Answers (2)

werner
werner

Reputation: 14845

Option 1: You can use an UDF as join expression:

import org.apache.spark.sql.functions._
val isSameAsUdf = udf(isSameAs(_,_))
val result = df1.join(df2, isSameAsUdf(df1.col("address"), df2.col("address")))

The downside of this approach is that Spark performs a cartesian product over both dataframes df1 and df2 and then filters the columns that do not match the join condition afterwards (more details here). Running result.explain prints

== Physical Plan ==
CartesianProduct UDF(address#4, address#10)
:- LocalTableScan [address#4]
+- LocalTableScan [address#10]

Option 2: to avoid the cartesian product, it might be faster to broadcast the reference data as a standard Scala sequence and then do the mapping of the addresses in another UDF:

val normalizedAddress: Seq[String] = //content of df2 as scala sequence
val broadcastSeq = spark.sparkContext.broadcast(normalizedAddress)

def toNormalizedAddress(str: String ): String = 
    broadcastSeq.value.find(isSameAs(_, str)).getOrElse("")
val toNormalizedAddressUdf = udf(toNormalizedAddress(_))

val result2 = df2.withColumn("NormalizedAddress", toNormalizedAddressUdf('address))

The result is the same as for option 1, but result2.explain prints

== Physical Plan ==
LocalTableScan [address#10, NormalizedAddress#40]

This second option works, if the amount of reference data is small enough to be broadcasted. Depending on the cluster's hardware, some 10.000s lines of reference data would still considered to be small.

Upvotes: 2

Som
Som

Reputation: 6323

Assuming the below schema (address:string), try this-

Load the data

  val data1 =
      """Tempe, AZ, USA
        |San Jose, CA, USA
        |Mountain View, CA, USA""".stripMargin
    val df1 = data1.split(System.lineSeparator()).toSeq.toDF("address")
    df1.show(false)
    /**
      * +----------------------+
      * |address               |
      * +----------------------+
      * |Tempe, AZ, USA        |
      * |San Jose, CA, USA     |
      * |Mountain View, CA, USA|
      * +----------------------+
      */

    val data2 =
      """Tempe, AZ
        |Tempe, Arizona
        |San Jose, USA
        |San Jose, CA
        |Mountain View, CA""".stripMargin

    val df2 = data2.split(System.lineSeparator()).toSeq.toDF("address")
    df2.show(false)

    /**
      * +-----------------+
      * |address          |
      * +-----------------+
      * |Tempe, AZ        |
      * |Tempe, Arizona   |
      * |San Jose, USA    |
      * |San Jose, CA     |
      * |Mountain View, CA|
      * +-----------------+
      */

Extract the joining key and join based on that


    df1.withColumn("joiningKey", substring_index($"address", ",", 1))
      .join(
        df2.withColumn("joiningKey", substring_index($"address", ",", 1)),
        "joiningKey"
      )
      .select(df1("address"), df2("address"))
      .show(false)

    /**
      * +----------------------+-----------------+
      * |address               |address          |
      * +----------------------+-----------------+
      * |Tempe, AZ, USA        |Tempe, AZ        |
      * |Tempe, AZ, USA        |Tempe, Arizona   |
      * |San Jose, CA, USA     |San Jose, USA    |
      * |San Jose, CA, USA     |San Jose, CA     |
      * |Mountain View, CA, USA|Mountain View, CA|
      * +----------------------+-----------------+
      */

Upvotes: 1

Related Questions