Reputation: 3381
As the last question that SO suggests is related to mine is from 2011 I ask anew..
I was trying to prove that aggregating over a parallelized Spark array would be faster than over a normal array (all on a Dell XPS with 4 cores).
import org.apache.spark.{SparkConf, SparkContext}
object SparkStuffer extends App {
val appName: String = "My Spark Stuffer"
val master: String = "local"
val conf = new SparkConf().setAppName(appName).setMaster(master)
val sc = new SparkContext(conf)
// Returns '4'
println("Available processors: " + Runtime.getRuntime().availableProcessors())
val data = 1 to 100000000
val distData = sc.parallelize(data)
val sequData = data
val parallelIni = java.lang.System.currentTimeMillis();
distData.reduce((a, b) => a+b)
val parallelFin = java.lang.System.currentTimeMillis();
val seqIni = java.lang.System.currentTimeMillis();
sequData.reduce((a, b) => a+b)
val seqFin = java.lang.System.currentTimeMillis();
println("Par: " + (parallelFin - parallelIni))
println("Seq: " + (seqFin - seqIni))
// Par: 3262
// Seq: 1099
}
For reference I'm adding the build.sbt:
name := "spark_stuff"
version := "0.1"
scalaVersion := "2.12.12"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.0"
Why is the parallel aggregation not faster and if not here, what Example would show that it is faster?
Upvotes: 0
Views: 397
Reputation: 7732
There is a misconception here, first your step distData.reduce((a, b) => a+b)
is doing two things here. One is distributing the data, and second is processing the data. Not only processing as you are expecting.
Spark framework has two steps during the execution of a block of code, the transformation and the action. The transformation step is when Spark is just preparing the backend with what need to be done, checking if the data exists, if does make sense what you are doing and etc. That is what happening here: sc.parallelize(data)
. In this moment your code is not parallelizing anything, is just preparing to parallelize, the parallelization happens when you run distData.reduce((a, b) => a+b)
this is an action and after that it process the data.
I've run the same example in my cluster, and here are few results that you can use as reference:
Here we got the execution just like your code:
And here is with a small change, forcing the be parallelized before the reduce, to remove the overhead of the distribution using this code:
val data = 1 to 100000000
val distData = sc.parallelize(data)
distData.count()
distData.reduce((a, b) => a+b)
And here is the result of how fast it rans:
But, we need to know that not always a distributed algorithm will beat the iterative algorithm mostly for the overhead. Your dataset is pretty small and is mostly built in memory. So, a distributed code will beat the sequential only in a certain size of the data. What is the size? It depends. But the conclusion is, the parallel execution is slower in this case because the action step parallelize then executes the reduce.
Upvotes: 2