Reputation: 1194
I am trying to implement Local Outlier Factor on Spark. So I have a set of points that I read from a file and then for each point find the N nearest neighbors. Each point has an index given to it using zipWithIndex() command
So Now I have two RDDs Firstly
RDD[(Index:Long, Array[(NeighborIndex:Long, Distance:Double)])]
Where Long represents its index, and the Array consist of its N nearest neighbors with the Long representing the Index position of these neighbors and Double Representing their Distance from the given point
Second
RDD[(Index:Long,LocalReachabilityDensity:Double)]
Here, Long again represents the Index of a given point, and Double represents its Local Reachability density
What I want, is an RDD, which contains all the points, and an array of their N closest neighbors and their Local Reachability density
RDD[(Index:Long, Array[(NeighborIndex:Long,LocalReachabilityDensity:Double)])]
So basically here, Long would represent the index of a point, and the array would be of its N closest neighbors, with their index values and Local Reachability density.
According to my understanding, I need to run a map on the first RDD, and then join the values in its array with the second RDD that contain the Local Reachability densities, to get Local Reachability density for all the given indexes of its N neighbors. But I am not sure how to achieve this. If any one can help me out that would be great
Upvotes: 0
Views: 672
Reputation: 30300
Given:
val rdd1: RDD[(index: Long, Array[(neighborIndex: Long, distance: Double)])] = ...
val rdd2: RDD[(index: Long, localReachabilityDensity: Double)] = ...
I really don't like using Scala's Array
at all. I also don't like that your abstractions are cross-purposes; in other words, index
in rdd2
is buried in various entries in rdd1
. This makes things hard to reason about and also incurs the limitations of the Spark RDD API where you can't access a second RDD while transforming the first. I believe you should rewrite your current jobs to produce easier abstractions to work with.
But if you must:
val flipped = rdd1.map {
case (index, array) =>
array.map {
case (neighborIndex, distance) => (neighborIndex, (index, distance))
}.elements.toVector
}.flatMap(identity)
.groupBy(_._1)
val result = flipped.join(rdd2).mapValues {
case (indexDistances, localReachabilityDensity) =>
indexDistances.map {
case (index, _) => (index, localReachabilityDensity)
}
}
The basic idea is to flip rdd1
to "extract" the neighborIndex
values to the top level as the keys of the PairRDD
, which then allows me to do a join
with rdd2
. And to replace Array
with Vector
. Once you do the join on the same indices, combining things is much easier.
Note that this was off the top of my head and may not be perfect. The idea isn't so much to give you a solution to copy-paste but rather suggest a different direction.
Upvotes: 1