Reputation: 13
Im trying to compute the Canberra distance between two distinct RDD's within Apache Spark. The RDD's are of the same dimension and not particularly big.
Does anyone have any suggestions about the best approach to do this within the RDD api? The equation for the Canberra distance can be seen in the link below.
Canberra Distance Between Two Vectors
Upvotes: 1
Views: 192
Reputation: 26
You will need to create an index for each RDD and then join them. This will then allow you to perform a map to calculate the distance for each pair and then collect the total sum.
The following should work:
// I am assuming here that your vectors are initially stored as arrays of Double
val dataX = sc.parallelize(Array(11.0, 12.0, 13.0, 14.0, 15.0))
val dataY = sc.parallelize(Array(21.0, 22.0, 23.0, 24.0, 25.0))
def canberraDist(sc: SparkContext, X: RDD[Double], Y: RDD[Double]): Double ={
// Create an index based on length for each RDD.
// Index is added as second value so use map to switch order allowing join to work properly.
// This can be done in the join step, but added here for clarity.
val RDDX = X.zipWithIndex().map(x => (x._2,x._1))
val RDDY = Y.zipWithIndex().map(x => (x._2,x._1))
// Join the 2 RDDs on index value. Returns: RDD[(Long, (Double, Double))]
val RDDJoined = RDDX.map(x => (x._1,x._2)).join(RDDY.map(x => (x._1,x._2)))
// Calculate Canberra Distance
val distance = RDDJoined.map{case (id, (x,y)) => { ( math.abs(x - y) / (math.abs(x) + math.abs(y)) ) } }.reduce(_+_)
// Return Value
return distance
}
var totalDist = canberraDist(sc, dataX, dataY)
println(totalDist)
Upvotes: 1