blue-sky
blue-sky

Reputation: 53916

What is shuffle read & shuffle write in Apache Spark

In below screenshot of Spark admin running on port 8080 :

enter image description here

The "Shuffle Read" & "Shuffle Write" parameters are always empty for this code :

import org.apache.spark.SparkContext;

object first {
  println("Welcome to the Scala worksheet")

  val conf = new org.apache.spark.SparkConf()
    .setMaster("local")
    .setAppName("distances")
    .setSparkHome("C:\\spark-1.1.0-bin-hadoop2.4\\spark-1.1.0-bin-hadoop2.4")
    .set("spark.executor.memory", "2g")
  val sc = new SparkContext(conf)

  def euclDistance(userA: User, userB: User) = {

    val subElements = (userA.features zip userB.features) map {
      m => (m._1 - m._2) * (m._1 - m._2)
    }
    val summed = subElements.sum
    val sqRoot = Math.sqrt(summed)

    println("value is" + sqRoot)
    ((userA.name, userB.name), sqRoot)
  }

  case class User(name: String, features: Vector[Double])

  def createUser(data: String) = {

    val id = data.split(",")(0)
    val splitLine = data.split(",")

    val distanceVector = (splitLine.toList match {
      case h :: t => t
    }).map(m => m.toDouble).toVector

    User(id, distanceVector)

  }

  val dataFile = sc.textFile("c:\\data\\example.txt")
  val users = dataFile.map(m => createUser(m))
  val cart = users.cartesian(users) //
  val distances = cart.map(m => euclDistance(m._1, m._2))
  //> distances  : org.apache.spark.rdd.RDD[((String, String), Double)] = MappedR
  //| DD[4] at map at first.scala:46
  val d = distances.collect //

  d.foreach(println) //> ((a,a),0.0)
  //| ((a,b),0.0)
  //| ((a,c),1.0)
  //| ((a,),0.0)
  //| ((b,a),0.0)
  //| ((b,b),0.0)
  //| ((b,c),1.0)
  //| ((b,),0.0)
  //| ((c,a),1.0)
  //| ((c,b),1.0)
  //| ((c,c),0.0)
  //| ((c,),0.0)
  //| ((,a),0.0)
  //| ((,b),0.0)
  //| ((,c),0.0)
  //| ((,),0.0)

}

Why are "Shuffle Read" & "Shuffle Write" fields empty ? Can above code be tweaked in order to populate these fields so as to understand how

Upvotes: 38

Views: 57673

Answers (2)

taruxtin
taruxtin

Reputation: 841

Shuffling means the reallocation of data between multiple Spark stages. "Shuffle Write" is the sum of all written serialized data on all executors before transmitting (normally at the end of a stage) and "Shuffle Read" means the sum of read serialized data on all executors at the beginning of a stage.

Your programm has only one stage, triggered by the "collect" operation. No shuffling is required, because you have only a bunch of consecutive map operations which are pipelined in one Stage.

Try to take a look at these slides: http://de.slideshare.net/colorant/spark-shuffle-introduction

It could also help to read chapter 5 from the original paper: http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf

Upvotes: 84

Soumya Simanta
Soumya Simanta

Reputation: 11761

I believe you have to run your application in cluster/distributed mode to see any Shuffle read or write values. Typically "shuffle" are triggered by a subset of Spark actions (e.g., groupBy, join, etc)

Upvotes: 2

Related Questions