Reputation: 315
I am using HashPartioner
but getting an unexpected result.
I am using 3 different String as keys, and giving partition parameter as 3, so I expect 3 partitions.
val cars = Array("Honda", "Toyota", "Kia")
val carnamePrice = sc.parallelize(for {
x <- cars
y <- Array(100,200,300)
} yield (x, y), 8)
val rddEachCar = carnamePrice.partitionBy(new HashPartitioner(3))
val mapped = rddEachCar.mapPartitionsWithIndex{
(index, iterator) => {
println("Called in Partition -> " + index)
val myList = iterator.toList
myList.map(x => x + " -> " + index).iterator
}
}
mapped.take(10)
The result is below. It gives only 2 partitions. I checked hash codes for String (69909220 75427 -1783892706). What could be problem here? Probably I misunderstood partitioning algorithm.
Array[String] = Array((Toyota,100) -> 0, (Toyota,200) -> 0, (Toyota,300) -> 0, (Honda,100) -> 1, (Honda,200) -> 1, (Honda,300) -> 1, (Kia,100) -> 1, (Kia,200) -> 1, (Kia,300) -> 1)
Upvotes: 2
Views: 737
Reputation: 330193
There is nothing strange going on here. Utils.nonNegativeMod
, which is used by HashPartitioner
is implemented as follows:
def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
With 3 partitions the key distribution is defined as shown below:
for { car <- Seq("Honda", "Toyota", "Kia") }
yield (car -> nonNegativeMod(car.hashCode, 3))
Seq[(String, Int)] = List((Honda,1), (Toyota,0), (Kia,1))
which is exactly what you get in your case. In other words, lack of direct hash collision doesn't guarantee lack of collision modulo an arbitrary number.
Upvotes: 2