Reputation: 5895
I have scoreTriplets is RDD[ARRAY[String]] which I am sorting by following way.
var ScoreTripletsArray = scoreTriplets.collect()
if (ScoreTripletsArray.size > 0) {
/*Sort the ScoreTripletsArray descending by score field*/
scala.util.Sorting.stableSort(ScoreTripletsArray, (e1: Array[String], e2: Array[String]) => e1(3).toInt > e2(3).toInt)
}
But collect() will be heavy If there is elements in lack.
So I need to sort RDD by score
without using collect().
scoreTriples is RDD[ARRAY[String]] each line of RDD will store Array of the below variables.
EdgeId sourceID destID score
sourceNAme destNAme distance
Please give me any reference or hint.
Upvotes: 7
Views: 31174
Reputation: 330413
Sorting will be, due to shuffling, an expensive operation even without collecting but you can use sortBy
method:
import scala.util.Random
val data = Seq.fill(10)(Array.fill(3)("") :+ Random.nextInt.toString)
val rdd = sc.parallelize(data)
val sorted = rdd.sortBy(_.apply(3).toInt)
sorted.take(3)
// Array[Array[String]] = Array(
// Array("", "", "", -1660860558),
// Array("", "", "", -1643214719),
// Array("", "", "", -1206834289))
If you're interested only in the top results then top
and takeOrdered
are usually preferred.
import scala.math.Ordering
rdd.takeOrdered(2)(Ordering.by[Array[String], Int](_.apply(3).toInt))
// Array[Array[String]] =
// Array(Array("", "", "", -1660860558), Array("", "", "", -1643214719))
rdd.top(2)(Ordering.by[Array[String], Int](_.apply(3).toInt))
// Array[Array[String]] =
// Array(Array("", "", "", 1920955686), Array("", "", "", 1597012602))
Upvotes: 9