Deepak Garg
Deepak Garg

Reputation: 159

Number of Task in Apache spark while writing into HDFS

I am trying to read csv file and then adding some columns . After that trying to save in orc format.

I could not understand how spark decided number of tasks for different stages.

Why number of task for CSV stage is 1 and for ORC stage it is 39?

val c1c8 = spark.read.option("header",true).csv("/user/DEEPAK_TEST/C1C6_NEW/") 
val c1c8new = { c1c8.withColumnRenamed("c1c6_F","c1c8").withColumnRenamed("Network_Out","c1c8_network").withColumnRenamed("Access NE Out","c1c8_access_ne")
.withColumn("c1c8_signalling",when (col("signalling_Out") === "SIP Cl4" , "SIP CL4").when (col("signalling_Out") === "SIP cl4" , "SIP CL4").when (col("signalling_Out") === "Other" , "other").otherwise(col("signalling_Out")))
.withColumnRenamed("access type Out","c1c8_access_type").withColumnRenamed("Type_of_traffic_C","c1c8_typeoftraffic")
.withColumnRenamed("BOS traffic type Out","c1c8_bos_trafc_typ").withColumnRenamed("Scope_Out","c1c8_scope")
.withColumnRenamed("Join with UP-DWN SIP cl5 T1T7 Out","c1c8_join_indicator")
.select("c1c8","c1c8_network", "c1c8_access_ne", "c1c8_signalling", "c1c8_access_type", "c1c8_typeoftraffic",
 "c1c8_bos_trafc_typ", "c1c8_scope","c1c8_join_indicator")
} 
c1c8new.write.orc("/user/DEEPAK_TEST/C1C8_MAPPING_NEWT/")

Number of Stages

CSV Stage 1 task

ORC Stage 39 task

Upvotes: 0

Views: 844

Answers (1)

sanojmg
sanojmg

Reputation: 48

Below is my understanding from looking at Spark 2.x source code.

Stage 0 is a file scan that creates FileScanRDD which is an RDD that scans a list of file partitions. This stage can have more than one task when you are reading from multiple partitioned directories, such as a partitioned Hive table.

The number of tasks in Stage 1 will be equals to the number of RDD partitions. In your case c1c8new.rdd.getNumPartitions will be 39. This number is calculated using:

  • config value spark.files.maxPartitionBytes (128MB by default)
  • sparkContext.defaultParallelism returned by task scheduler (equal to number of cores when running in local mode)
  • totalBytes

DataSourceScanExec.scala#L423

val defaultMaxSplitBytes =
      fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
    val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
    val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
    val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
    val bytesPerCore = totalBytes / defaultParallelism

    val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
    logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
      s"open cost is considered as scanning $openCostInBytes bytes.")

You can see actual calculated values in the above log message if you set the log level to INFO - spark.sparkContext.setLogLevel("INFO")

In your case, I think the split size is 128 and so, number of tasks/partitions is roughly 4.6G/128MB

As a side note, you can change the number of partitions (and hence the number of tasks in the subsequent stage) by using repartition() or coalesce() on the dataframe. More importantly, the number of partitions after a shuffle is determined by spark.sql.shuffle.partitions (200 by default). If you have a shuffle, it is better to use this configuration to control the number of tasks because inserting repartition() or coalesce() between stages adds extra overhead.

For large spark SQL workloads, setting optimum values for spark.sql.shuffle.partitions in each stage was always a pain point. Spark 3.x has better support for this if Adaptive Query Execution is enabled, but I haven't tried it for any production workloads.

Upvotes: 1

Related Questions