blue-sky
blue-sky

Reputation: 53786

Converting a Scala method to Spark

The below Scala method returns the k nearest neighbours of an Array :

  def getNearestNeighbours(distances: Array[((String, String), Double)], k: Int, label: String) = {                    //| label: String)List[((String, String), Double)]

    distances.filter(v => v._1._1.equals(label) || v._1._2.equals(label)).sortBy(_._2).take(k)
  }

I want to run this function in parallel. I can try converting the Array to an RDD but type RDD does not support the functions .sortBy(_._2).take(k) Is there a way to emulate this method in Spark/Scala ?

A possible solution is to modify the method so that the RDD is converted to an Array everytime the method is called, but I think this is computationally expensive for large RDD's ? :

  def getNearestNeighbours(distances: RDD[((String, String), Double)], k: Int, label: String) = {                    //| label: String)List[((String, String), Double)]

    distances.collect.filter(v => v._1._1.equals(label) || v._1._2.equals(label)).sortBy(_._2).take(k)
  }

Upvotes: 0

Views: 206

Answers (2)

Daniel Darabos
Daniel Darabos

Reputation: 27455

Do not collect the RDD. It pulls all the data to one machine. Change your input so it is keyed by the negative distance (RDD[Double, (String, String)]) and then use RDD.top(k).

Upvotes: 2

Alexey Romanov
Alexey Romanov

Reputation: 170713

RDD does have sortByKey method, which sorts RDDs of pairs by the first element, so if you can create RDD[(Double, (String, String))] instead of RDD[((String, String), Double)] (or just call rdd.map(p => (p._2, p._1)), you can translate the algorithm directly. It also has take, but the documentation says:

Return an array with the first n elements of the dataset. Note that this is currently not executed in parallel. Instead, the driver program computes all the elements.

So I wouldn't expect this to work well.

Besides, if the data fits on one machine, just working with Arrays (or parallel collections) is likely to be faster. Spark does what it can to minimize overhead, but distributed sorting is going to have some overhead anyway!

In addition, sorting the entire array/RDD/other collection if you just need the least n elements is a bad idea (again, especially in cases when you'd want to use Spark). You need a selection algorithm like ones described in Worst-case O(n) algorithm for doing k-selection or In an integer array with N elements , find the minimum k elements?. Unfortunately, they aren't available in Scala standard library or in Spark (that I know of).

Upvotes: 1

Related Questions