elcomendante
elcomendante

Reputation: 1161

SPARK: How to merge two Data Frames given a condition from function result?

How to add dataFrameToAdd to dataFrameMain given the condition that dataFrameToAdd.lable != dataFrameMain.label and where distance res is less than 0.0002 ??

case class Schema(name: String,label: String, lat: Double, lon: Double)

val dataFrameMain = sc.parallelize(Array(
Schema("recordA","house",54.78049,-1.57679 ),
Schema("recordB","hotel",52.02724,-2.16572 ),
Schema("recordC","hotel",52.51423,-1.97814 ),
Schema("recordD","house",51.46966,-0.45227 ),
Schema("recordE","house",50.91608,-1.45803 ),
Schema("recordF","house",52.59754,-1.07599 )
)).toDF()

val dataFrameToAdd = sc.parallelize(Array(
Schema("recordAduplicate","house", 54.780705, -1.576777),
Schema("recordBnotDuplicate","hotel",54.783477, -1.57986 )
 )).toDF()

def distance(latDF: Double, lonDF: Double,latNEW: Double, lonNEW: Double): Double = {
val dx = latNEW - latDF
val dy = lonNEW - lonDF
val res = math.sqrt(dx*dx + dy*dy)
return res }

import org.apache.spark.sql.functions.udf
sqlContext.udf.register("distance",distance(_:Double,_:Double, _:Double, _:Double ): Double)

I am not sure how to approach this problem. Should I apply transpose function or perhaps include Mlib matrix data structures? As output from this example recordBnotDuplicate from dataFrameToAdd should be merged with dataFrameMain as its distance is more than 0.0002, but not recordAduplicate as it has the same table as recordA from dataFrameMain and it hs distance value is less than 0.0002

Upvotes: 1

Views: 6697

Answers (1)

Tzach Zohar
Tzach Zohar

Reputation: 37822

Here's one way - not sure it's the most efficient one performance-wise:

Right after registering the UDF, register each DF as a tmp table, and use a LEFT JOIN to choose all of B's records that don't match any of A's records; Then - union the result with A:

dataFrameMain.registerTempTable("a")
dataFrameToAdd.registerTempTable("b")

val withoutDuplicates: DataFrame = sqlContext.sql(
  """
    |SELECT b.*
    |FROM b
    |LEFT JOIN a ON a.label = b.label AND distance(a.lat, a.lon, b.lat, b.lon) <= 0.002
    |WHERE a.name IS NULL
  """.stripMargin)

val result = withoutDuplicates.unionAll(dataFrameMain)

Printing the result gives the expected:

+-------------------+-----+---------+--------+
|               name|label|      lat|     lon|
+-------------------+-----+---------+--------+
|recordBnotDuplicate|hotel|54.783477|-1.57986|
|            recordA|house| 54.78049|-1.57679|
|            recordB|hotel| 52.02724|-2.16572|
|            recordC|hotel| 52.51423|-1.97814|
|            recordD|house| 51.46966|-0.45227|
|            recordE|house| 50.91608|-1.45803|
|            recordF|house| 52.59754|-1.07599|
+-------------------+-----+---------+--------+

Upvotes: 3

Related Questions