S.Kang
S.Kang

Reputation: 611

Is there an effective partitioning method when using reduceByKey in Spark?

When I use reduceByKey or aggregateByKey, I'm confronted with partition problems.

ex)reduceBykey(_+_).map(code)

Especially, if input data is skewed, the partitioning problem becomes even worse when using the above methods.

So, as a solution to this, I use repartition method.

For example, http://dev.sortable.com/spark-repartition/ is similar.

This is good for partition distribution, but therepartition is also expensive.

Is there a way to solve the partition problem wisely?

Upvotes: 5

Views: 5829

Answers (2)

zero323
zero323

Reputation: 330063

You have to distinguish between two different problems:

Data skew

If data distribution is highly skewed (let's assume the worst case scenario with only a single unique key) then by definition the output will be skewed and changing a partitioner cannot help you.

There are some techniques, which can be used to partially address the problem, but overall partitioning is not a core issue here.

Partitioner bias

Poorly chosen partitioning function can result in a skewed data distribution even if data is uniformly distributed. For example:

val rdd = sc.parallelize(Seq((5, None), (10, None), (15, None), (20, None)), 5)
rdd
  .partitionBy(new org.apache.spark.HashPartitioner(5))
  .glom.map(_.size).collect
Array[Int] = Array(4, 0, 0, 0, 0)

As you can see despite the fact that key distribution is not skewed, skewed has been induced by the data regularities and poor properties of the hashCode.

In case like this choosing different Partitioner:

rdd
  .partitionBy(new org.apache.spark.RangePartitioner(5, rdd))
  .glom.map(_.size).collect
Array[Int] = Array(1, 1, 1, 1, 0)

or adjusting properties of the existing one:

rdd
  .partitionBy(new org.apache.spark.HashPartitioner(7))
  .glom.map(_.size).collect
Array[Int] = Array(0, 1, 0, 1, 0, 1, 1)

can resolve the issue.

Upvotes: 3

Thiago Baldim
Thiago Baldim

Reputation: 7732

You are right,

Repartition is really expensive to run. Due to shuffles and other minor steps. Creating an example as you example said like this:

rdd.map(x => (x, x * x)).repartition(8).reduceByKey(_+_)

See the DAG here:

enter image description here

This step will create at DAG, one map, one repartition and one reduce.

But if you use the repartition inside the reduceByKey you can take a repartition for "free".

The main part of Repratition is the Shuffle, and the main part of reduceByKey is the shuffle too. You can see that in Scala lib, the reduceByKey has a numPartition parameter.

So you can change your code for this:

rdd.map(x => (x, x * x)).reduceByKey(_+_, 8)

enter image description here

And you can see the same code with the repartition in the reduceByKey it is much faster. Due to you have one less shuffle to do.

enter image description here

Upvotes: 4

Related Questions