java_enthu
java_enthu

Reputation: 2327

Random Partitioner behavior on the joined RDD

I am trying to join two data sets. One of type (Id, salesRecord) another (Id,Name). First data-set is partitioned by HashPartitioner and Second is partitioned by Custom Partitioner. When I join these RDDs by id and try to see which partition-er information is retained I randomly see that some times joinRDD displays custom partitioner and sometimes HashPartitioner. I received different partioner results while changing the number of partitions also.

According to the Learning Spark book, rdd1.join(rdd2) retains the partition info from the rdd1.

Here is the code.

  val hashPartitionedRDD = cusotmerIDSalesRecord.partitionBy(new HashPartitioner(10))
println("hashPartitionedRDD's partitioner " + hashPartitionedRDD.partitioner) // Seeing Instance of HashParitioner

val customPartitionedRDD = customerIdNamePair1.partitionBy(new CustomerPartitioner)
println("customPartitionedRDD partitioner " + customPartitionedRDD.partitioner) // Seeing instance of CustomPartitioner

val expectedHash = hashPartitionedRDD.join(customPartitionedRDD)
val expectedCustom = customPartitionedRDD.join(hashPartitionedRDD)

println("Expected Hash " + expectedHash.partitioner) // Seeing instance of Custom Partitioner
println("Expected Custom " + expectedCustom.partitioner) //Seeing instance of Custom Partitioner

// Just to add more to it when number of partitions of both the data sets I made equal I am seeing the reverse results. i.e. 
// expectedHash shows CustomPartitioner and 
// expectedCustom shows Hashpartitioner Instance.

Upvotes: 2

Views: 412

Answers (1)

Mohitt
Mohitt

Reputation: 2977

The join method internally calls, Partitioner.defaultPartitioner() method.

Based on definition of defaultPartitioner:

def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
    val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
    for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
      return r.partitioner.get
    }
    if (rdd.context.conf.contains("spark.default.parallelism")) {
      new HashPartitioner(rdd.context.defaultParallelism)
    } else {
      new HashPartitioner(bySize.head.partitions.size)
    }
  }
}

If you look closely in line:

val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse

It starts the for-loop (or search) based on number of partitions in descending order. So if both RDDs have their own partitioners, the one with higher number of partitions will be chosen.

EDIT: The issue you raised about seeing reverse behavior is quite simple. Here, if both have same number of partitions, the others will come at the top of the Seq. So, the partitioner of the argument RDD will be chosen.

(Seq(rdd) ++ others).sortBy(_.partitions.size).reverse

This behavior is explainable, but perhaps not intuitive.

Upvotes: 4

Related Questions