Reputation: 2234
The result of our reduceByKey
operation results in an RDD
that is quite skewed, with lots of data in one or two partitions. To increase the parallelism of the processing after the reduceByKey
we do a repartition
, which forces a shuffle.
rdd.reduceByKey(_+_).repartition(64)
I know that it is possible to pass a Partitioner into the reduceByKey
opertion. But (other than creating a Custom one) I think the options are the HashPartioner
and the RangePartitioner
. And I think both of these will result in the data being skewed after they partition, since the keys are quite unique.
Is it possible to shuffle the output RDD
in a reduceByKey
evenly, without the additional repartition
call?
Upvotes: 1
Views: 994
Reputation: 771
Your assumption: "I think the options are the HashPartioner and the RangePartitioner. And I think both of these will result in the data being skewed after they partition, since the keys are quite unique." is not necessarily correct.
The standard RangePartitioner
tries to distribute the data evenly by sampling and in our case with quite skew data (which is dependend on the geographical distribution of people) it turned out to work very well and distributed the data evenly.
See also the explanation here: How does range partitioner work in Spark?
So I would recommend to use the standard RangePartitioner
in your reducyByKey.
Write a small test or logging output which checks the size of your partitions.
You can use mapPartitionsWithIndex
to collect some statistics about your partitions and have an Id for the logging output.
Upvotes: 2
Reputation: 619
I am assuming you are having keys after reduceByKey clustering in some regions. For example, 1,2,3,4,5, 45,46,47,48,49, 356, 789.
So you will have skew data when you rangePartitioner. But HashPartitioner depends on the hash function used to calculate the hash of the keys.
(Scala RDD may use hashCode, DataSets uses MurmurHash 3, PySpark, portable_hash).
Now if the default hashPartitioner is also giving you data skew, then you will have to use Custom Partitioner and apply hash using some good hash partitioner that will surely generate a sparse hash, e.g. md5.
To implement custom Partitioner in spark you will have to extend Partitioner and implement 3 methods:
numPartitions
equals
getPartition
In getPartition method you can calculate md5 hash of the key and distribute them according to numPartitions parameter.
You can also check Zero323's answer for more on this.
Upvotes: 0
Reputation: 7732
Patrick,
I don't know if you have access to your Spark UI. But, if you need to do a repartition to your RDD I suggest you to add by the method of numPartitions
This one you can reshuffle your rdd "for free".
If you can check your Spark UI, if you use like you show:
rdd.reduceByKey(_+_).repartition(64)
You will see the Graph of execution that the reduce will be done, and after that the process of repartition will start.
If you use:
rdd.reduceByKey(_+_, numPartitions=64)
You will see that the execution will happens without any extra job.
Here is how to use: Spark ReduceByKey
Upvotes: -1