Reputation: 655
I use Spark in local
mode from an IDE/eclipse.
I can see Spark UI creating many jobs for a simple aggregation. Why?
import org.apache.spark.sql.SparkSession
trait SparkSessionWrapper {
lazy val spark: SparkSession = {
SparkSession
.builder()
.master("local[2]")
.appName("Spark Me")
.getOrCreate()
}
spark.sparkContext.setLogLevel("WARN")
}
The Spark application is as follows:
object RowNumberCalc
extends App
with SparkSessionWrapper {
import spark.implicits._
val cityDf = Seq(
("London", "Harish",5500,"2019-10-01"),
("NYC","RAJA",11121,"2019-10-01"),
("SFO","BABU",77000,"2019-10-01"),
("London","Rick",7500,"2019-09-01"),
("NYC","Jenna",6511,"2019-09-01"),
("SFO","Richard",234567,"2019-09-01"),
("London","Harish",999999,"2019-08-01"),
("NYC","Sam",1234,"2019-08-01"),
("SFO","Dylan",45678,"2019-08-01")).toDF("city","name","money","month_id")
cityDf.createOrReplaceTempView("city_table")
val totalMoneySql =
"""
|select city, sum(money) from city_table group by 1 """.stripMargin
spark.sql(totalMoneySql).show(false)
System.in.read
spark.stop()
}
As shown a simple calculation of Sum of Money for Each City Now SPARK-UI shows ==> 5 JOBS each with 2 Stages !!!
And SQL tab also shows 5 jobs .
But Physical Plan shows correct Stage division
== Physical Plan ==
CollectLimit 21
+- *(2) LocalLimit 21
+- *(2) HashAggregate(keys=[city#9], functions=[sum(cast(money#11 as bigint))], output=[city#9, sum(money)#24])
+- Exchange hashpartitioning(city#9, 200)
+- *(1) HashAggregate(keys=[city#9], functions=[partial_sum(cast(money#11 as bigint))], output=[city#9, sum#29L])
+- LocalTableScan [city#9, money#11]
FROM WHERE/HOW 5 JOBS are being triggered ???
Upvotes: 4
Views: 181
Reputation: 74669
tl;dr You've got a very few rows to work with (9 as the main input and 3 aggregates) across the default 200 partitions and so 5 Spark jobs to meet the requirements of Dataset.show
to show 20 rows.
In other words, what you experience is Dataset.show
-specific (which by the way is not for large datasets, isn't it?)
By default Dataset.show
displays 20 rows. It starts with 1 partition and takes up to 20 rows. If there are not enough rows, it multiplies by 4 (if I'm not mistaken) and scans the other 4 partitions to find the missing rows. That works until 20 rows are collected.
Number of output rows of the last HashAggregate
is 3 rows.
Depending on what partitions these 3 rows are in Spark could run one, two or more jobs. It strongly depends on the hash of the rows (per HashPartitioner
).
If you really want to see a single Spark job for this number of rows (9 for the input) start the Spark application with spark.sql.shuffle.partitions
configuration property as 1
.
That will make the computation with 1 partition after the aggregation and all the result rows in one partition.
Upvotes: 4