Reputation: 2327
I am trying to join two data sets. One of type (Id, salesRecord) another (Id,Name). First data-set is partitioned by HashPartitioner and Second is partitioned by Custom Partitioner. When I join these RDDs by id and try to see which partition-er information is retained I randomly see that some times joinRDD displays custom partitioner and sometimes HashPartitioner. I received different partioner results while changing the number of partitions also.
According to the Learning Spark book, rdd1.join(rdd2) retains the partition info from the rdd1.
Here is the code.
val hashPartitionedRDD = cusotmerIDSalesRecord.partitionBy(new HashPartitioner(10))
println("hashPartitionedRDD's partitioner " + hashPartitionedRDD.partitioner) // Seeing Instance of HashParitioner
val customPartitionedRDD = customerIdNamePair1.partitionBy(new CustomerPartitioner)
println("customPartitionedRDD partitioner " + customPartitionedRDD.partitioner) // Seeing instance of CustomPartitioner
val expectedHash = hashPartitionedRDD.join(customPartitionedRDD)
val expectedCustom = customPartitionedRDD.join(hashPartitionedRDD)
println("Expected Hash " + expectedHash.partitioner) // Seeing instance of Custom Partitioner
println("Expected Custom " + expectedCustom.partitioner) //Seeing instance of Custom Partitioner
// Just to add more to it when number of partitions of both the data sets I made equal I am seeing the reverse results. i.e.
// expectedHash shows CustomPartitioner and
// expectedCustom shows Hashpartitioner Instance.
Upvotes: 2
Views: 412
Reputation: 2977
The join
method internally calls, Partitioner.defaultPartitioner()
method.
Based on definition of defaultPartitioner
:
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
return r.partitioner.get
}
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
} else {
new HashPartitioner(bySize.head.partitions.size)
}
}
}
If you look closely in line:
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
It starts the for-loop
(or search) based on number of partitions in descending order. So if both RDDs
have their own partitioners, the one with higher number of partitions will be chosen.
EDIT: The issue you raised about seeing reverse
behavior is quite simple. Here, if both have same number of partitions, the others
will come at the top of the Seq
. So, the partitioner of the argument RDD
will be chosen.
(Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
This behavior is explainable, but perhaps not intuitive.
Upvotes: 4