SanBan
SanBan

Reputation: 655

Why does Spark run 5 jobs for a simple aggregation?

I use Spark in local mode from an IDE/eclipse.

I can see Spark UI creating many jobs for a simple aggregation. Why?

import org.apache.spark.sql.SparkSession

trait SparkSessionWrapper {

  lazy val spark: SparkSession = {
    SparkSession
      .builder()
      .master("local[2]")
      .appName("Spark Me")
      .getOrCreate()
  }

  spark.sparkContext.setLogLevel("WARN")

} 

The Spark application is as follows:

object RowNumberCalc
  extends App
  with SparkSessionWrapper {

  import spark.implicits._

  val cityDf = Seq(
    ("London", "Harish",5500,"2019-10-01"),
    ("NYC","RAJA",11121,"2019-10-01"),
    ("SFO","BABU",77000,"2019-10-01"),
    ("London","Rick",7500,"2019-09-01"),
    ("NYC","Jenna",6511,"2019-09-01"),
    ("SFO","Richard",234567,"2019-09-01"),
    ("London","Harish",999999,"2019-08-01"),
    ("NYC","Sam",1234,"2019-08-01"),
    ("SFO","Dylan",45678,"2019-08-01")).toDF("city","name","money","month_id")

  cityDf.createOrReplaceTempView("city_table")
  val totalMoneySql =
    """
      |select city, sum(money) from city_table group by 1 """.stripMargin
  spark.sql(totalMoneySql).show(false)


  System.in.read
  spark.stop()

}

As shown a simple calculation of Sum of Money for Each City Now SPARK-UI shows ==> 5 JOBS each with 2 Stages !!! enter image description here

And SQL tab also shows 5 jobs .

enter image description here

But Physical Plan shows correct Stage division

== Physical Plan ==
CollectLimit 21
+- *(2) LocalLimit 21
   +- *(2) HashAggregate(keys=[city#9], functions=[sum(cast(money#11 as bigint))], output=[city#9, sum(money)#24])
      +- Exchange hashpartitioning(city#9, 200)
         +- *(1) HashAggregate(keys=[city#9], functions=[partial_sum(cast(money#11 as bigint))], output=[city#9, sum#29L])
            +- LocalTableScan [city#9, money#11]

FROM WHERE/HOW 5 JOBS are being triggered ???

Upvotes: 4

Views: 181

Answers (1)

Jacek Laskowski
Jacek Laskowski

Reputation: 74669

tl;dr You've got a very few rows to work with (9 as the main input and 3 aggregates) across the default 200 partitions and so 5 Spark jobs to meet the requirements of Dataset.show to show 20 rows.


In other words, what you experience is Dataset.show-specific (which by the way is not for large datasets, isn't it?)

By default Dataset.show displays 20 rows. It starts with 1 partition and takes up to 20 rows. If there are not enough rows, it multiplies by 4 (if I'm not mistaken) and scans the other 4 partitions to find the missing rows. That works until 20 rows are collected.

Number of output rows of the last HashAggregate is 3 rows.

Depending on what partitions these 3 rows are in Spark could run one, two or more jobs. It strongly depends on the hash of the rows (per HashPartitioner).


If you really want to see a single Spark job for this number of rows (9 for the input) start the Spark application with spark.sql.shuffle.partitions configuration property as 1.

That will make the computation with 1 partition after the aggregation and all the result rows in one partition.

Upvotes: 4

Related Questions