Reputation: 53916
In below screenshot of Spark admin running on port 8080 :
The "Shuffle Read" & "Shuffle Write" parameters are always empty for this code :
import org.apache.spark.SparkContext;
object first {
println("Welcome to the Scala worksheet")
val conf = new org.apache.spark.SparkConf()
.setMaster("local")
.setAppName("distances")
.setSparkHome("C:\\spark-1.1.0-bin-hadoop2.4\\spark-1.1.0-bin-hadoop2.4")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
def euclDistance(userA: User, userB: User) = {
val subElements = (userA.features zip userB.features) map {
m => (m._1 - m._2) * (m._1 - m._2)
}
val summed = subElements.sum
val sqRoot = Math.sqrt(summed)
println("value is" + sqRoot)
((userA.name, userB.name), sqRoot)
}
case class User(name: String, features: Vector[Double])
def createUser(data: String) = {
val id = data.split(",")(0)
val splitLine = data.split(",")
val distanceVector = (splitLine.toList match {
case h :: t => t
}).map(m => m.toDouble).toVector
User(id, distanceVector)
}
val dataFile = sc.textFile("c:\\data\\example.txt")
val users = dataFile.map(m => createUser(m))
val cart = users.cartesian(users) //
val distances = cart.map(m => euclDistance(m._1, m._2))
//> distances : org.apache.spark.rdd.RDD[((String, String), Double)] = MappedR
//| DD[4] at map at first.scala:46
val d = distances.collect //
d.foreach(println) //> ((a,a),0.0)
//| ((a,b),0.0)
//| ((a,c),1.0)
//| ((a,),0.0)
//| ((b,a),0.0)
//| ((b,b),0.0)
//| ((b,c),1.0)
//| ((b,),0.0)
//| ((c,a),1.0)
//| ((c,b),1.0)
//| ((c,c),0.0)
//| ((c,),0.0)
//| ((,a),0.0)
//| ((,b),0.0)
//| ((,c),0.0)
//| ((,),0.0)
}
Why are "Shuffle Read" & "Shuffle Write" fields empty ? Can above code be tweaked in order to populate these fields so as to understand how
Upvotes: 38
Views: 57673
Reputation: 841
Shuffling means the reallocation of data between multiple Spark stages. "Shuffle Write" is the sum of all written serialized data on all executors before transmitting (normally at the end of a stage) and "Shuffle Read" means the sum of read serialized data on all executors at the beginning of a stage.
Your programm has only one stage, triggered by the "collect" operation. No shuffling is required, because you have only a bunch of consecutive map operations which are pipelined in one Stage.
Try to take a look at these slides: http://de.slideshare.net/colorant/spark-shuffle-introduction
It could also help to read chapter 5 from the original paper: http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf
Upvotes: 84
Reputation: 11761
I believe you have to run your application in cluster/distributed mode to see any Shuffle read or write values. Typically "shuffle" are triggered by a subset of Spark actions (e.g., groupBy, join, etc)
Upvotes: 2