Hassan Jalil
Hassan Jalil

Reputation: 1194

Join on two RDDs using Scala in Spark

I am trying to implement Local Outlier Factor on Spark. So I have a set of points that I read from a file and then for each point find the N nearest neighbors. Each point has an index given to it using zipWithIndex() command

So Now I have two RDDs Firstly

RDD[(Index:Long, Array[(NeighborIndex:Long, Distance:Double)])]

Where Long represents its index, and the Array consist of its N nearest neighbors with the Long representing the Index position of these neighbors and Double Representing their Distance from the given point

Second

RDD[(Index:Long,LocalReachabilityDensity:Double)]

Here, Long again represents the Index of a given point, and Double represents its Local Reachability density

What I want, is an RDD, which contains all the points, and an array of their N closest neighbors and their Local Reachability density

RDD[(Index:Long, Array[(NeighborIndex:Long,LocalReachabilityDensity:Double)])]

So basically here, Long would represent the index of a point, and the array would be of its N closest neighbors, with their index values and Local Reachability density.

According to my understanding, I need to run a map on the first RDD, and then join the values in its array with the second RDD that contain the Local Reachability densities, to get Local Reachability density for all the given indexes of its N neighbors. But I am not sure how to achieve this. If any one can help me out that would be great

Upvotes: 0

Views: 672

Answers (1)

Vidya
Vidya

Reputation: 30300

Given:

val rdd1: RDD[(index: Long, Array[(neighborIndex: Long, distance: Double)])] = ...
val rdd2: RDD[(index: Long, localReachabilityDensity: Double)] = ...

I really don't like using Scala's Array at all. I also don't like that your abstractions are cross-purposes; in other words, index in rdd2 is buried in various entries in rdd1. This makes things hard to reason about and also incurs the limitations of the Spark RDD API where you can't access a second RDD while transforming the first. I believe you should rewrite your current jobs to produce easier abstractions to work with.

But if you must:

val flipped = rdd1.map { 
  case (index, array) => 
    array.map {
      case (neighborIndex, distance) => (neighborIndex, (index, distance))
    }.elements.toVector
}.flatMap(identity)
 .groupBy(_._1)
val result = flipped.join(rdd2).mapValues {
   case (indexDistances, localReachabilityDensity) => 
      indexDistances.map {
         case (index, _) => (index, localReachabilityDensity)
      }    
}

The basic idea is to flip rdd1 to "extract" the neighborIndex values to the top level as the keys of the PairRDD, which then allows me to do a join with rdd2. And to replace Array with Vector. Once you do the join on the same indices, combining things is much easier.

Note that this was off the top of my head and may not be perfect. The idea isn't so much to give you a solution to copy-paste but rather suggest a different direction.

Upvotes: 1

Related Questions