Lyu Chenghao
Lyu Chenghao

Reputation: 41

Why does a disk busy spike happen between finishing of a job and shutting down Spark?

I detected an unexpected disk IO (DISKBUSY spike) after all my spark tasks finished but spark context has not stopped -- as shown in figure case 2 at 21:56:47. Could anyone help explain it and give suggestions on how to avoid or postpone it? Or does spark context have some periodical async IO activities that might lead to the spikes? Thanks!

Given an example of running a SparkSQL batch job in two cases. In the first one, I execute the sql workload, and stop spark context immediately after .show() action finishes. In the second case, I add a 1-minute sleep after .show() by using Thread.sleep(60000), then stop the spark context. The result shows that the time costs for executing the sql workload in two cases are similar, but there is an unexpected DISKBUSY spike on the disk who is doing local storage for shuffle write in the second case. See the spike in the figure of case 2.

Here are more details.

The system setup

Here is my current analysis

  1. It is not caused by the disk itself and other background processes. I tried disk2, disk3, disk4, and disk8 for yarn local storage for testing if the spike is related to the program and it shows the same spikes every time I executed the case 2.
  2. The spike is caused by Spark itself. I tried the standalone deploy mode and the spike still exists (with no Yarn).
  3. It might be relevant to the shuffling. The total shuffle write size of my target batch job is close to 2GB. I also tried different workload with its shuffle writing size close to 1MB, 250MB and 1GB. The DISKBUSY becomes negligible for the batch job with shuffling write size 1MB and becomes up to 80% for the batch job with the total shuffling write size 250MB.
  4. The size of the local storage file is traced. When disk spike appears, disk writing is detected but the disk size does not increase. Therefore, (1) it might not relevant to disk cache clean (2) it might be some disk swapping happening (not too sure).

According to my analysis currently, I suspect that it should be caused by something I am not familiar -- such as some spark async behavior on disks. Could anyone help explain it? Thanks!

Here is the first case. Case 1: without sleep

Here is the second case. Case 2: with 60 seconds sleep

To be more clear in the figure, the worker1 node local stands for the disk1 in worker1, the worker2 local stands for the disk1 in worker2; the worker1 node dfs stands for the disk8 in worker1 and the worker2 node dfs stands for the disk8 in worker2, where HDFS locates. The left y-axis is the diskbusy (from 0% to 100%) detected by nmon and the right y-axis is the size of the directory for hdfs in disk8 (which we can just ignore for this problem).

Here is my code.

import org.apache.spark.sql.SparkSession
object Q16 {
  def main(args: Array[String]): Unit = {
    val db = s"bigbench_sf_100"

    val spark = SparkSession
      .builder()
      .enableHiveSupport()
      .getOrCreate()
    val sc = spark.sparkContext

    spark.sql(s"use $db")

    val t1 = System.currentTimeMillis()
    spark.sql(
      s"""
         |SELECT w_state, i_item_id,
         |  SUM(
         |    CASE WHEN (unix_timestamp(d_date,'yyyy-MM-dd') < unix_timestamp('2001-03-16','yyyy-MM-dd'))
         |    THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
         |    ELSE 0.0 END
         |  ) AS sales_before,
         |  SUM(
         |    CASE WHEN (unix_timestamp(d_date,'yyyy-MM-dd') >= unix_timestamp('2001-03-16','yyyy-MM-dd'))
         |    THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
         |    ELSE 0.0 END
         |  ) AS sales_after
         |FROM (
         |  SELECT *
         |  FROM web_sales ws
         |  LEFT OUTER JOIN web_returns wr ON (ws.ws_order_number = wr.wr_order_number
         |    AND ws.ws_item_sk = wr.wr_item_sk)
         |) a1
         |JOIN item i ON a1.ws_item_sk = i.i_item_sk
         |JOIN warehouse w ON a1.ws_warehouse_sk = w.w_warehouse_sk
         |JOIN date_dim d ON a1.ws_sold_date_sk = d.d_date_sk
         |AND unix_timestamp(d.d_date, 'yyyy-MM-dd') >= unix_timestamp('2001-03-16', 'yyyy-MM-dd') - 30*24*60*60 --subtract 30 days in seconds
         |AND unix_timestamp(d.d_date, 'yyyy-MM-dd') <= unix_timestamp('2001-03-16', 'yyyy-MM-dd') + 30*24*60*60 --add 30 days in seconds
         |GROUP BY w_state,i_item_id
         |--original was ORDER BY w_state,i_item_id , but CLUSTER BY is hives cluster scale counter part
         |ORDER BY w_state,i_item_id
         |LIMIT 100
       """.stripMargin).show
    val t2 = System.currentTimeMillis()

//    For case 2
//    Thread.sleep(60 * 1000)

    spark.stop()
  }
}

Upvotes: 3

Views: 309

Answers (1)

Lyu Chenghao
Lyu Chenghao

Reputation: 41

I figure out the reason for the unexpected IO activity.

This is the file system buffer cache behavior. In general, when a process writes to a file, the data is not written to disk immediately and instead it is written to a cache in memory. This cache is maintained by the OS/file system as a performance optimization since it allows write requests to return after writing to memory and not wait for slow I/Os to complete. This dirty data is periodically flushed to disk in the background by the OS.

So basically, the disk activities (flushing) are not avoidable unless the file pages are deleted when cached in the disk buffer (in case 1).

You can force all dirty data to be written out immediately by using a Linux system command sync.

Upvotes: 1

Related Questions