Sugimiyanto
Sugimiyanto

Reputation: 329

What is the alternative and faster way to look up an element in an RDD

I am new in Scala and Spark. This is a simple example of my whole code:

package trouble.something

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Stack {
  def ExFunc2(looku: RDD[(Int, List[(Double, Int)])], ke: Int): Seq[List[(Double, Int)]] = {
    val y: Seq[List[(Double, Int)]] = looku.lookup(ke)
    val g = y.map{x =>
      x
      /* some functions here
      .
      .
       */
    }
    g
  }

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("toy")
    val sc = new SparkContext(conf)

    val pi: RDD[(Int, List[(Double, Int)])] = sc.parallelize(Seq((1, List((9.0, 3), (7.0, 2))), (2, List((7.0, 1), (1.0, 3))), (3, List((1.0, 2), (9.0, 1)))))
    val res = ExFunc2(pi, 1)
    println(res)
  }
}

I am running a large enough data, and I need faster performance. By looking at Spark's web UI and a software profiler. The most consuming time is lookup() function:

 val y: Seq[List[(Double, Int)]] = looku.lookup(ke)

What is an alternative and way to lookup an element in an RDD rather than lookup() function?

There is a discussion related to this problem Spark: Fastest way to look up an element in an RDD. However, it does not give me any idea.

Upvotes: 2

Views: 1349

Answers (1)

soote
soote

Reputation: 3260

You should not have performance issues with the lookup function if you use and scale it carefully.

def lookup(key: K): Seq[V]

Return the list of values in the RDD for key key. This operation is done efficiently if the RDD has a known partitioner by only searching the partition that the key maps to.

By default functions which generate a PairRdd use the HashPartitioner. So check what your spark.default.parallelism value is set to, since this is the number of partitions that the HashPartitioner will default to. You can tune that parameter to match the # of executors * # of cores per executor you are using.

You should confirm that your PairRdd does in fact have a known partitioner, and if it does not, use partitionBy to create one, or modify your existing code to use a HashPartitioner when the PairRdd is created.

let parallelismFactor = # of executors * # of cores per executor

Then if the lookup function is still too slow, you will need to increase the parallelismFactor you are using. Now spark will know which partition to lookup in, and as you increase the parallelismFactor, you will reduce the size of each partition, which will increase the speed of the lookup.

Keep in mind that you may wish to have many times more partitions then executors * cores, you will have to benchmark your use case yourself, trying values from 1-10 times more partitions then executors * cores.

Upvotes: 3

Related Questions