Reputation: 848
I have the below problem...
A spark data frame has two columns: source and destination; both are of type string. It has directional mapping of values. That is: a -> b and also b -> a
In my output dataset, I don't want to have both the records. if I have a -> b and b -> a, then I need to retain only one. Similarly, if I have b -> c and c -> b, I have to retain only one.
I have the below program; but it does not eliminate duplicates; not sure where I making mistake. And also if there is a better way of implementing, please guide me.
flightsummary is the data frame from where I get source and destination.
import scala.collection.mutable.{Map => CMap}
var map:CMap[String,String] = CMap()
def check( map:CMap[String,String],a:String, b:String) = {
var found = false
for(e <- map)
if( (e._1==a && e._2==b) || (e._2==a && e._1==b) )
found = true
found
}
flightSummary
.selectExpr("source","dest")
.filter{ (r:Row) =>
{ if(map.size ==0) {
map +=(r(0).asInstanceOf[String] -> r(1).asInstanceOf[String] )
true
}
else {
if( check( map, r(0).asInstanceOf[String], r(1).asInstanceOf[String]) || check( map, r(1).asInstanceOf[String], r(0).asInstanceOf[String]) )
false
else {
map += ( r(0).asInstanceOf[String] -> r(1).asInstanceOf[String] )
true
}
}
}
}
.show(200)
Upvotes: 0
Views: 150
Reputation: 18108
You can functionally:
You use SQL or DF api appropriately.
Upvotes: 2
Reputation: 42422
Try if this works:
flightSummary
.selectExpr("source", "dest", "array_sort(array(source, dest)) as map")
.dropDuplicates("map")
.drop("map")
Upvotes: 1