Patrick McGloin
Patrick McGloin

Reputation: 2234

How to not skew data after a reduceByKey?

The result of our reduceByKey operation results in an RDD that is quite skewed, with lots of data in one or two partitions. To increase the parallelism of the processing after the reduceByKey we do a repartition, which forces a shuffle.

rdd.reduceByKey(_+_).repartition(64)

I know that it is possible to pass a Partitioner into the reduceByKey opertion. But (other than creating a Custom one) I think the options are the HashPartioner and the RangePartitioner. And I think both of these will result in the data being skewed after they partition, since the keys are quite unique.

Is it possible to shuffle the output RDD in a reduceByKey evenly, without the additional repartition call?

Upvotes: 1

Views: 994

Answers (3)

lhaferkamp
lhaferkamp

Reputation: 771

Your assumption: "I think the options are the HashPartioner and the RangePartitioner. And I think both of these will result in the data being skewed after they partition, since the keys are quite unique." is not necessarily correct.

The standard RangePartitioner tries to distribute the data evenly by sampling and in our case with quite skew data (which is dependend on the geographical distribution of people) it turned out to work very well and distributed the data evenly.

See also the explanation here: How does range partitioner work in Spark?

So I would recommend to use the standard RangePartitioner in your reducyByKey.

Write a small test or logging output which checks the size of your partitions. You can use mapPartitionsWithIndex to collect some statistics about your partitions and have an Id for the logging output.

Upvotes: 2

Avik Aggarwal
Avik Aggarwal

Reputation: 619

I am assuming you are having keys after reduceByKey clustering in some regions. For example, 1,2,3,4,5, 45,46,47,48,49, 356, 789.

So you will have skew data when you rangePartitioner. But HashPartitioner depends on the hash function used to calculate the hash of the keys.

(Scala RDD may use hashCode, DataSets uses MurmurHash 3, PySpark, portable_hash).

Now if the default hashPartitioner is also giving you data skew, then you will have to use Custom Partitioner and apply hash using some good hash partitioner that will surely generate a sparse hash, e.g. md5.

To implement custom Partitioner in spark you will have to extend Partitioner and implement 3 methods:

  1. numPartitions

  2. equals

  3. getPartition

    In getPartition method you can calculate md5 hash of the key and distribute them according to numPartitions parameter.

You can also check Zero323's answer for more on this.

Upvotes: 0

Thiago Baldim
Thiago Baldim

Reputation: 7732

Patrick,

I don't know if you have access to your Spark UI. But, if you need to do a repartition to your RDD I suggest you to add by the method of numPartitions

This one you can reshuffle your rdd "for free".

If you can check your Spark UI, if you use like you show:

rdd.reduceByKey(_+_).repartition(64)

You will see the Graph of execution that the reduce will be done, and after that the process of repartition will start.

If you use:

rdd.reduceByKey(_+_, numPartitions=64)

You will see that the execution will happens without any extra job.

Here is how to use: Spark ReduceByKey

Upvotes: -1

Related Questions