user3103957
user3103957

Reputation: 848

Eliminating records in a spark dataframe

I have the below problem...

A spark data frame has two columns: source and destination; both are of type string. It has directional mapping of values. That is: a -> b and also b -> a

In my output dataset, I don't want to have both the records. if I have a -> b and b -> a, then I need to retain only one. Similarly, if I have b -> c and c -> b, I have to retain only one.

I have the below program; but it does not eliminate duplicates; not sure where I making mistake. And also if there is a better way of implementing, please guide me.

flightsummary is the data frame from where I get source and destination.

   import scala.collection.mutable.{Map => CMap} 
   
   var map:CMap[String,String] = CMap() 
   
   def check( map:CMap[String,String],a:String, b:String) = {
     var found = false
     for(e <- map) 
       if( (e._1==a && e._2==b) || (e._2==a && e._1==b) )
         found = true
     found
   }
   
    flightSummary
   .selectExpr("source","dest")
   .filter{ (r:Row) =>
              { if(map.size ==0) {
                    map +=(r(0).asInstanceOf[String] -> r(1).asInstanceOf[String] )
                    true
                }
                else {
                  if( check( map, r(0).asInstanceOf[String], r(1).asInstanceOf[String])  || check( map, r(1).asInstanceOf[String], r(0).asInstanceOf[String]) ) 
                      false
                  else {
                    map += ( r(0).asInstanceOf[String] -> r(1).asInstanceOf[String] )
                    true
                  }
                }
              }              
          }
   .show(200)

Upvotes: 0

Views: 150

Answers (2)

Ged
Ged

Reputation: 18108

You can functionally:

  • process each entry to a non-map set of columns c1, c2
  • reverse the tuple (c1, c2) for which c1 < c2
  • then apply a distinct and bob's your uncle

You use SQL or DF api appropriately.

Upvotes: 2

mck
mck

Reputation: 42422

Try if this works:

flightSummary
   .selectExpr("source", "dest", "array_sort(array(source, dest)) as map")
   .dropDuplicates("map")
   .drop("map")

Upvotes: 1

Related Questions