Alejandro Alcalde
Alejandro Alcalde

Reputation: 6220

Avoid Shuffling with ReduceByKey in Spark

I am taking the coursera Course on Scala Spark, and I am trying to optimize this snippet:

val indexedMeansG = vectors.                                                                                                                                                                 
       map(v => findClosest(v, means) -> v).                                                                                                                                                      
       groupByKey.mapValues(averageVectors)

vectors is a RDD[(Int, Int)], in order to see the list of dependencies and the lineage of the RDD I've used:

println(s"""GroupBy:                                                                                                                                                                         
             | Deps: ${indexedMeansG.dependencies.size}                                                                                                                                           
             | Deps: ${indexedMeansG.dependencies}                                                                                                                                                
             | Lineage: ${indexedMeansG.toDebugString}""".stripMargin)

Which shows this:

/* GroupBy:                                                                                                                                                                                  
   * Deps: 1                                                                                                                                                                                      
   * Deps: List(org.apache.spark.OneToOneDependency@44d1924)                                                                                                                                      
   * Lineage: (6) MapPartitionsRDD[18] at mapValues at StackOverflow.scala:207 []                                                                                                                 
   *  ShuffledRDD[17] at groupByKey at StackOverflow.scala:207 []                                                                                                                                 
   * +-(6) MapPartitionsRDD[16] at map at StackOverflow.scala:206 []                                                                                                                              
   *  MapPartitionsRDD[13] at map at StackOverflow.scala:139 []                                                                                                                                   
   *      CachedPartitions: 6; MemorySize: 84.0 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B                                                                                                
   *  MapPartitionsRDD[12] at values at StackOverflow.scala:116 []                                                                                                                                
   *  MapPartitionsRDD[11] at mapValues at StackOverflow.scala:115 []                                                                                                                             
   *  MapPartitionsRDD[10] at groupByKey at StackOverflow.scala:92 []                                                                                                                             
   *  MapPartitionsRDD[9] at join at StackOverflow.scala:91 []                                                                                                                                    
   *  MapPartitionsRDD[8] at join at StackOverflow.scala:91 []                                                                                                                                    
   *  CoGroupedRDD[7] at join at StackOverflow.scala:91 []                                                                                                                                        
   *    +-(6) MapPartitionsRDD[4] at map at StackOverflow.scala:88 []                                                                                                                             
   *  |  MapPartitionsRDD[3] at filter at StackOverflow.scala:88 []                                                                                                                               
   *  |  MapPartitionsRDD[2] at map at StackOverflow.scala:69 []                                                                                                                                  
   *  |  src/main/resources/stackoverflow/stackoverflow.csv MapPartitionsRDD[1] at textFile at StackOverflow.scala:23 []                                                                          
   *  |  src/main/resources/stackoverflow/stackoverflow.csv HadoopRDD[0] at textFile at StackOverflow.scala:23 []                                                                                 
   *    +-(6) MapPartitionsRDD[6] at map at StackOverflow.scala:89 []                                                                                                                             
   *  MapPartitionsRDD[5] at filter at StackOverflow.scala:89 []                                                                                                                                  
   *  MapPartitionsRDD[2] at map at StackOverflow.scala:69 []                                                                                                                                     
   *  src/main/resources/stackoverflow/stackoverflow.csv MapPartitionsRDD[1] at textFile at StackOverflow.scala:23 []                                                                             
   *  src/main/resources/stackoverflow/stackoverflow.csv HadoopRDD[0] at textFile at StackOverflow.scala:23 [] */

From this List(org.apache.spark.OneToOneDependency@44d1924) I deduced no shuffling is being done, Am I right? However, below ShuffledRDD[17] is printed, which means in fact there is shuffling.

I've tried to replace that groupByKey call with a reduceByKey, like this:

val indexedMeansR = vectors.                                                                                                                                                              
      map(v => findClosest(v, means) -> v).                                                                                                                                                   
      reduceByKey((a, b) => (a._1 + b._1) / 2 -> (a._2 + b._2) / 2)

And its dependencies and lineage are:

/* ReduceBy:                                                                                                                                                                                 
   * Deps: 1                                                                                                                                                                                      
   * Deps: List(org.apache.spark.ShuffleDependency@4d5e813f)                                                                                                                                      
   * Lineage: (6) ShuffledRDD[17] at reduceByKey at StackOverflow.scala:211 []                                                                                                                    
   * +-(6) MapPartitionsRDD[16] at map at StackOverflow.scala:210 []                                                                                                                              
   *  MapPartitionsRDD[13] at map at StackOverflow.scala:139 []                                                                                                                                   
   *      CachedPartitions: 6; MemorySize: 84.0 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B                                                                                                
   *  MapPartitionsRDD[12] at values at StackOverflow.scala:116 []                                                                                                                                
   *  MapPartitionsRDD[11] at mapValues at StackOverflow.scala:115 []                                                                                                                             
   *  MapPartitionsRDD[10] at groupByKey at StackOverflow.scala:92 []                                                                                                                             
   *  MapPartitionsRDD[9] at join at StackOverflow.scala:91 []                                                                                                                                    
   *  MapPartitionsRDD[8] at join at StackOverflow.scala:91 []                                                                                                                                    
   *  CoGroupedRDD[7] at join at StackOverflow.scala:91 []                                                                                                                                        
   *    +-(6) MapPartitionsRDD[4] at map at StackOverflow.scala:88 []                                                                                                                             
   *  |  MapPartitionsRDD[3] at filter at StackOverflow.scala:88 []                                                                                                                               
   *  |  MapPartitionsRDD[2] at map at StackOverflow.scala:69 []                                                                                                                                  
   *  |  src/main/resources/stackoverflow/stackoverflow.csv MapPartitionsRDD[1] at textFile at StackOverflow.scala:23 []                                                                          
   *  |  src/main/resources/stackoverflow/stackoverflow.csv HadoopRDD[0] at textFile at StackOverflow.scala:23 []                                                                                 
   *    +-(6) MapPartitionsRDD[6] at map at StackOverflow.scala:89 []                                                                                                                             
   *  MapPartitionsRDD[5] at filter at StackOverflow.scala:89 []                                                                                                                                  
   *  MapPartitionsRDD[2] at map at StackOverflow.scala:69 []                                                                                                                                     
   *  src/main/resources/stackoverflow/stackoverflow.csv MapPartitionsRDD[1] at textFile at StackOverflow.scala:23 []                                                                             
   *  src/main/resources/stackoverflow/stackoverflow.csv HadoopRDD[0] at textFile at StackOverflow.scala:23 [] */

This time, the dependency is ShuffleDependency and I am not able to understand why.

Since the RDD is a pair which keys are Ints, and therefore have an ordering, I've also tried to modified the partitioner and use a RangePartitioner, but it doesn't improve either

Upvotes: 3

Views: 1866

Answers (1)

Joe C
Joe C

Reputation: 15684

A reduceByKey operation still involves a shuffle, as it's still required to ensure that all items with the same key become part of the same partition.

However, this will be a much smaller shuffle operation than a groupByKey operation. A reduceByKey will perform the reduction operation within each partition before shuffling, thus reducing the amount of data to be shuffled.

Upvotes: 3

Related Questions