Reputation: 1093
Let me help to clarify about shuffle in depth and how Spark uses shuffle managers. I report some very helpful resources:
https://trongkhoanguyenblog.wordpress.com/
https://0x0fff.com/spark-architecture-shuffle/
https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/4-shuffleDetails.md
Reading them, I understood there are different shuffle managers. I want to focus about two of them: hash manager
and sort manager
(which is the default manager).
For expose my question, I want to start from a very common transformation:
val rdd = reduceByKey(_ + _)
This transformation causes map-side aggregation and then shuffle for bringing all the same keys into the same partition.
My questions are:
Is Map-Side aggregation implemented using internally a mapPartition transformation and thus aggregating all the same keys using the combiner function or is it implemented with a AppendOnlyMap
or ExternalAppendOnlyMap
?
If AppendOnlyMap
or ExternalAppendOnlyMap
maps are used for aggregating, are they used also for reduce side aggregation that happens into the ResultTask
?
What exaclty the purpose about these two kind of maps (AppendOnlyMap
or ExternalAppendOnlyMap
)?
Are AppendOnlyMap
or ExternalAppendOnlyMap
used from all shuffle managers or just from the sortManager?
I read that after AppendOnlyMap
or ExternalAppendOnlyMap
are full, are spilled into a file, how exactly does this steps happen?
Using the Sort shuffle manager, we use an appendOnlyMap for aggregating and combine partition records, right? Then when execution memory is fill up, we start sorting map, spilling it to disk and then clean up the map, my question is : what is the difference between spill to disk and shuffle write? They consist basically in creating file on local file system, but they are treat differently, Shuffle write records, are not put into the appendOnlyMap.
Can you explain in depth what happen when reduceByKey being executed, explaining me all the steps involved for to accomplish that? Like for example all the steps for map side aggregation, shuffling and so on.
Upvotes: 15
Views: 3201
Reputation: 1712
It follows the description of reduceByKey
step-by-step:
reduceByKey
calls combineByKeyWithTag
, with identity combiner and identical merge value and create valuecombineByKeyWithClassTag
creates an Aggregator
and returns ShuffledRDD
. Both "map" and "reduce" side aggregations use internal mechanism and don't utilize mapPartitions
.Agregator
uses ExternalAppendOnlyMap
for both combineValuesByKey
("map side reduction") and combineCombinersByKey
("reduce side reduction")ExternalAppendOnlyMap.insertAllMethod
ExternalAppendOnlyMap
keeps track of spilled parts and the current in-memory map (SizeTrackingAppendOnlyMap
)insertAll
method updates in-memory map and checks on insert if size estimated size of the current map exceeds the threshold. It uses inherited Spillable.maybeSpill
method. If threshold is exceeded this method calls spill
as a side effect, and insertAll
initializes clean SizeTrackingAppendOnlyMap
spill
calls spillMemoryIteratorToDisk
which gets DiskBlockObjectWriter
object from the block manager.insertAll
steps are applied for both map and reduce side aggregations with corresponding Aggregator
functions with shuffle stage in between.
As of Spark 2.0 there is only sort based manager: SPARK-14667
Upvotes: 6