Reputation: 53786
Reading def of lookup method from https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.PairRDDFunctions :
def
lookup(key: K): Seq[V]
Return the list of values in the RDD for key key. This operation is done efficiently if the RDD has a known partitioner by only searching the partition that the key maps to.
How can ensure that the RDD has a known partitioner ? I understand that an RDD is partitioned across node's in a cluster but what is meant by statement only searching the partition that the key maps to.
?
Upvotes: 2
Views: 4212
Reputation: 27456
A Partitioner
maps keys to partition indexes. If a key-value RDD is partitioned by a Partitioner
, it means that each key is placed in the partition that is assigned to it be the Partitioner
.
This is great for lookup
! You can use the Partitioner
to tell you the partition that this key belongs to, and then you only need to look at that partition of the RDD. (This can mean that the rest of the RDD does not even need to be computed!)
How can ensure that the RDD has a known partitioner ?
You can check that rdd.partitioner
is not None
. (Operations that need to locate keys, like groupByKey
and join
, partition the RDD for you.) You can use rdd.partitionBy
to assign your own Partitioner
and re-shuffle the RDD by it.
Upvotes: 1
Reputation: 3354
Each RDD can ,optionally, define a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned).
Indeed, in some pairRDDFunctions you can specify the partitioner, frequently in last parameter.
Or if your RDD hasn't partitioner, can use partitionBy method to set it.
Lookup method go directly partition if your RDD already has a partitioner or scan all the partitions in parallel if hasn't.
Upvotes: 0
Reputation: 67065
A number of operations (especially on key-value pairs) automatically set up a partition when they are executed as it can increase efficiency by cutting down on network traffic. For example (From PairRDDFunctions):
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp)
}
Note the creation of a HashPartitioner
. You can check the partitioner
of your RDD
if you want to see if it has one. You can also set one via partitionBy
Upvotes: 1