Reputation: 1161
How to add dataFrameToAdd to dataFrameMain given the condition that dataFrameToAdd.lable != dataFrameMain.label and where distance res is less than 0.0002 ??
case class Schema(name: String,label: String, lat: Double, lon: Double)
val dataFrameMain = sc.parallelize(Array(
Schema("recordA","house",54.78049,-1.57679 ),
Schema("recordB","hotel",52.02724,-2.16572 ),
Schema("recordC","hotel",52.51423,-1.97814 ),
Schema("recordD","house",51.46966,-0.45227 ),
Schema("recordE","house",50.91608,-1.45803 ),
Schema("recordF","house",52.59754,-1.07599 )
)).toDF()
val dataFrameToAdd = sc.parallelize(Array(
Schema("recordAduplicate","house", 54.780705, -1.576777),
Schema("recordBnotDuplicate","hotel",54.783477, -1.57986 )
)).toDF()
def distance(latDF: Double, lonDF: Double,latNEW: Double, lonNEW: Double): Double = {
val dx = latNEW - latDF
val dy = lonNEW - lonDF
val res = math.sqrt(dx*dx + dy*dy)
return res }
import org.apache.spark.sql.functions.udf
sqlContext.udf.register("distance",distance(_:Double,_:Double, _:Double, _:Double ): Double)
I am not sure how to approach this problem. Should I apply transpose function or perhaps include Mlib matrix data structures? As output from this example recordBnotDuplicate from dataFrameToAdd should be merged with dataFrameMain as its distance is more than 0.0002, but not recordAduplicate as it has the same table as recordA from dataFrameMain and it hs distance value is less than 0.0002
Upvotes: 1
Views: 6697
Reputation: 37822
Here's one way - not sure it's the most efficient one performance-wise:
Right after registering the UDF, register each DF as a tmp table, and use a LEFT JOIN to choose all of B's records that don't match any of A's records; Then - union the result with A:
dataFrameMain.registerTempTable("a")
dataFrameToAdd.registerTempTable("b")
val withoutDuplicates: DataFrame = sqlContext.sql(
"""
|SELECT b.*
|FROM b
|LEFT JOIN a ON a.label = b.label AND distance(a.lat, a.lon, b.lat, b.lon) <= 0.002
|WHERE a.name IS NULL
""".stripMargin)
val result = withoutDuplicates.unionAll(dataFrameMain)
Printing the result gives the expected:
+-------------------+-----+---------+--------+
| name|label| lat| lon|
+-------------------+-----+---------+--------+
|recordBnotDuplicate|hotel|54.783477|-1.57986|
| recordA|house| 54.78049|-1.57679|
| recordB|hotel| 52.02724|-2.16572|
| recordC|hotel| 52.51423|-1.97814|
| recordD|house| 51.46966|-0.45227|
| recordE|house| 50.91608|-1.45803|
| recordF|house| 52.59754|-1.07599|
+-------------------+-----+---------+--------+
Upvotes: 3