Marco
Marco

Reputation: 2235

Functional approach in sequential RDD processing [Apache Spark]

I have a RDD, connected to an HBase table. Each row(key) represents a GPS location. Now I've written a function to calculate the distance between two points. The function should be called with the current row and its predecessor [i-1]

Now I'm struggling to get this done in a functional way with RDD functions so that I can parallelize it.

My quick and dirty approach is to first create an array

val rows = rdd.collect()
val rowCount = rdd.count() - 1 //since the first row has no distance
val rowArray = new Array[(String, Point, Point)](rowCount.asInstanceOf[Int])
var i = 0 //can be better solved in scala, I know ;)

rows.foreach(row => {
  if (predecssorPoint == null) {
    predecssorPoint = getPointByRow(row._2)
  }
  else {
    currentPoint = getPointByRow(row._2)
    rowArray(i) = Tuple3(row._1, predecssorPoint, currentPoint)

    i += 1
    predecssorPoint = currentPoint
  }
})

return rowArray

Then I'll parallelize the array and calculate the distance

  //create a parallel-enabled data set
  val parallelDataSet = sc.parallelize(rows)

  parallelDataSet.foreach(row => {     
  Functions.logDistance(row)
})

That works but it's ugly and surely inefficient.

My idea know was to use rdd.reduce() to get rid of the foreach loop and this might work if the distance function handle the issue that the ordering of (a+b) is not guaranteed.

Anyways, is there a better solution? My understanding is that there is no possibility to have an (efficient) index-access when working with RDDs.

Thanks.

Upvotes: 3

Views: 1058

Answers (1)

maasg
maasg

Reputation: 37435

Given that ordering is key here, a good way to proceed could be to first index the RDD. Then, using the index we can simulate a zip and have the tuples partitioned over the cluster. Something like this:

val indexed = rdd.zipWithIndex.map(_.swap) // 
val shifted = indexed.map{case (k,v) => (k-1,v)}
val joined = indexed.join(shifted)
val distanceRDD = joined.map{(k,(v1,v2)) => distanceFunction(v1,v2)}

(*) example code - not tested

Upvotes: 2

Related Questions