blue-sky
blue-sky

Reputation: 53786

Apache Spark lookup function

Reading def of lookup method from https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.PairRDDFunctions :

def
lookup(key: K): Seq[V]
Return the list of values in the RDD for key key. This operation is done efficiently if the RDD has a known partitioner by only searching the partition that the key maps to.

How can ensure that the RDD has a known partitioner ? I understand that an RDD is partitioned across node's in a cluster but what is meant by statement only searching the partition that the key maps to. ?

Upvotes: 2

Views: 4212

Answers (3)

Daniel Darabos
Daniel Darabos

Reputation: 27456

A Partitioner maps keys to partition indexes. If a key-value RDD is partitioned by a Partitioner, it means that each key is placed in the partition that is assigned to it be the Partitioner.

This is great for lookup! You can use the Partitioner to tell you the partition that this key belongs to, and then you only need to look at that partition of the RDD. (This can mean that the rest of the RDD does not even need to be computed!)

How can ensure that the RDD has a known partitioner ?

You can check that rdd.partitioner is not None. (Operations that need to locate keys, like groupByKey and join, partition the RDD for you.) You can use rdd.partitionBy to assign your own Partitioner and re-shuffle the RDD by it.

Upvotes: 1

gasparms
gasparms

Reputation: 3354

Each RDD can ,optionally, define a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned).

Indeed, in some pairRDDFunctions you can specify the partitioner, frequently in last parameter.

Or if your RDD hasn't partitioner, can use partitionBy method to set it.

Lookup method go directly partition if your RDD already has a partitioner or scan all the partitions in parallel if hasn't.

Upvotes: 0

Justin Pihony
Justin Pihony

Reputation: 67065

A number of operations (especially on key-value pairs) automatically set up a partition when they are executed as it can increase efficiency by cutting down on network traffic. For example (From PairRDDFunctions):

def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
    aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp)
  }

Note the creation of a HashPartitioner. You can check the partitioner of your RDD if you want to see if it has one. You can also set one via partitionBy

Upvotes: 1

Related Questions