Reputation: 22518
I have trouble to find in the Spark documentation operations that causes a shuffle and operation that does not. In this list, which ones does cause a shuffle and which ones does not?
Map and filter does not. However, I am not sure with the others.
map(func)
filter(func)
flatMap(func)
mapPartitions(func)
mapPartitionsWithIndex(func)
sample(withReplacement, fraction, seed)
union(otherDataset)
intersection(otherDataset)
distinct([numTasks]))
groupByKey([numTasks])
reduceByKey(func, [numTasks])
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks])
join(otherDataset, [numTasks])
cogroup(otherDataset, [numTasks])
cartesian(otherDataset)
pipe(command, [envVars])
coalesce(numPartitions)
Upvotes: 47
Views: 29793
Reputation: 35404
Here is the generalised statement on shuffling transformations.
Transformations which can cause a shuffle include repartition operations like
repartition
andcoalesce
, ‘ByKey operations (except for counting) likegroupByKey
andreduceByKey
, and join operations likecogroup
andjoin
.
Upvotes: 5
Reputation: 1903
Here is a list of operations that might cause a shuffle:
join
: hash partition
leftOuterJoin
: hash partition
rightOuterJoin
: hash partition
groupByKey
: hash partition
reduceByKey
: hash partition
combineByKey
: hash partition
sortByKey
: range partition
intersection
: hash partition
Source: Big Data Analysis with Spark and Scala, Optimizing with Partitions, Coursera
Upvotes: 25
Reputation: 4858
This might be helpful: https://spark.apache.org/docs/latest/programming-guide.html#shuffle-operations
or this: http://www.slideshare.net/SparkSummit/dev-ops-training, starting with slide 208
from slide 209: "Transformations that use 'numPartitions' like distinct will probably shuffle"
Upvotes: 5
Reputation: 18750
It is actually extremely easy to find this out, without the documentation. For any of these functions just create an RDD and call to debug string, here is one example you can do the rest on ur own.
scala> val a = sc.parallelize(Array(1,2,3)).distinct
scala> a.toDebugString
MappedRDD[5] at distinct at <console>:12 (1 partitions)
MapPartitionsRDD[4] at distinct at <console>:12 (1 partitions)
**ShuffledRDD[3] at distinct at <console>:12 (1 partitions)**
MapPartitionsRDD[2] at distinct at <console>:12 (1 partitions)
MappedRDD[1] at distinct at <console>:12 (1 partitions)
ParallelCollectionRDD[0] at parallelize at <console>:12 (1 partitions)
So as you can see distinct
creates a shuffle. It is also particularly important to find out this way rather than docs because there are situations where a shuffle will be required or not required for a certain function. For example join usually requires a shuffle but if you join two RDD's that branch from the same RDD spark can sometimes elide the shuffle.
Upvotes: 52