RobertFrobisher
RobertFrobisher

Reputation: 13

Computing the Canberra distance with Apache Spark

Im trying to compute the Canberra distance between two distinct RDD's within Apache Spark. The RDD's are of the same dimension and not particularly big.

Does anyone have any suggestions about the best approach to do this within the RDD api? The equation for the Canberra distance can be seen in the link below.

Canberra Distance Between Two Vectors

Upvotes: 1

Views: 192

Answers (1)

JohnB
JohnB

Reputation: 26

You will need to create an index for each RDD and then join them. This will then allow you to perform a map to calculate the distance for each pair and then collect the total sum.

The following should work:

// I am assuming here that your vectors are initially stored as arrays of Double
val dataX = sc.parallelize(Array(11.0, 12.0, 13.0, 14.0, 15.0))
val dataY = sc.parallelize(Array(21.0, 22.0, 23.0, 24.0, 25.0))

def canberraDist(sc: SparkContext, X: RDD[Double], Y: RDD[Double]):  Double ={      
  // Create an index based on length for each RDD.
  // Index is added as second value so use map to switch order allowing join to work properly.
  // This can be done in the join step, but added here for clarity.
  val RDDX = X.zipWithIndex().map(x => (x._2,x._1))
  val RDDY = Y.zipWithIndex().map(x => (x._2,x._1))

  // Join the 2 RDDs on index value. Returns: RDD[(Long, (Double, Double))]
  val RDDJoined = RDDX.map(x => (x._1,x._2)).join(RDDY.map(x => (x._1,x._2)))

  // Calculate Canberra Distance   
  val distance = RDDJoined.map{case (id, (x,y)) => { ( math.abs(x - y) / (math.abs(x) + math.abs(y)) ) } }.reduce(_+_)
  // Return Value
  return distance
}

var totalDist = canberraDist(sc, dataX, dataY)
println(totalDist)

Upvotes: 1

Related Questions