MiguelPeralvo
MiguelPeralvo

Reputation: 877

Reliability issues with Checkpointing/WAL in Spark Streaming 1.6.0

Description

We have a Spark Streaming 1.5.2 application in Scala that reads JSON events from a Kinesis Stream, does some transformations/aggregations and writes the results to different S3 prefixes. The current batch interval is 60 seconds. We have 3000-7000 events/sec. We’re using checkpointing to protect us from losing aggregations.

It’s been working well for a while, recovering from exceptions and even cluster restarts. We recently recompiled the code for Spark Streaming 1.6.0, only changing the library dependencies in the build.sbt file. After running the code in a Spark 1.6.0 cluster for several hours, we’ve noticed the following:

  1. “Input Rate” and “Processing Time” volatility has increased substantially (see the screenshots below) in 1.6.0.
  2. Every few hours, there’s an ‘’Exception thrown while writing record: BlockAdditionEvent … to the WriteAheadLog. java.util.concurrent.TimeoutException: Futures timed out after [5000 milliseconds]” exception (see complete stack trace below) coinciding with the drop to 0 events/sec for specific batches (minutes).

After doing some digging, I think the second issue looks related to this Pull Request. The initial goal of the PR: “When using S3 as a directory for WALs, the writes take too long. The driver gets very easily bottlenecked when multiple receivers send AddBlock events to the ReceiverTracker. This PR adds batching of events in the ReceivedBlockTracker so that receivers don’t get blocked by the driver for too long.”

We are checkpointing in S3 in Spark 1.5.2 and there are no performance/reliability issues. We’ve tested checkpointing in Spark 1.6.0 in S3 and local NAS and in both cases we’re receiving this exception. It looks like when it takes more than 5 seconds to checkpoint a batch, this exception arises and we’ve checked that the events for that batch are lost forever.

Questions

Spark 1.5.2 Statistics - Screenshot

enter image description here

Spark 1.6.0 Statistics - Screenshot

enter image description here

Full Stack Trace

16/01/19 03:25:03 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(0,Some(3521),Some(SequenceNumberRanges(SequenceNumberRange(StreamEventsPRD,shardId-000000000003,49558087746891612304997255299934807015508295035511636018,49558087746891612304997255303224294170679701088606617650), SequenceNumberRange(StreamEventsPRD,shardId-000000000004,49558087949939897337618579003482122196174788079896232002,49558087949939897337618579006984380295598368799020023874), SequenceNumberRange(StreamEventsPRD,shardId-000000000001,49558087735072217349776025034858012188384702720257294354,49558087735072217349776025038332464993957147037082320914), SequenceNumberRange(StreamEventsPRD,shardId-000000000009,49558088270111696152922722880993488801473174525649617042,49558088270111696152922722884455852348849472550727581842), SequenceNumberRange(StreamEventsPRD,shardId-000000000000,49558087841379869711171505550483827793283335010434154498,49558087841379869711171505554030816148032657077741551618), SequenceNumberRange(StreamEventsPRD,shardId-000000000002,49558087853556076589569225785774419228345486684446523426,49558087853556076589569225789389107428993227916817989666))),BlockManagerBasedStoreResult(input-0-1453142312126,Some(3521)))) to the WriteAheadLog.
java.util.concurrent.TimeoutException: Futures timed out after [5000 milliseconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:81)
    at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:232)
    at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:87)
    at org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:321)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:500)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1230)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:498)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Source Code Extract

...
     // Function to create a new StreamingContext and set it up
  def setupContext(): StreamingContext = {
    ...
    // Create a StreamingContext
    val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))

    // Create a Kinesis DStream
    val data = KinesisUtils.createStream(ssc,
      kinesisAppName, kinesisStreamName,
      kinesisEndpointUrl, RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName(),
      InitialPositionInStream.LATEST, Seconds(kinesisCheckpointIntervalSeconds),
      StorageLevel.MEMORY_AND_DISK_SER_2, awsAccessKeyId, awsSecretKey)
...
    ssc.checkpoint(checkpointDir)

    ssc
  }


  // Get or create a streaming context.
  val ssc = StreamingContext.getActiveOrCreate(checkpointDir, setupContext)

  ssc.start()
  ssc.awaitTermination()

Upvotes: 13

Views: 2681

Answers (1)

MiguelPeralvo
MiguelPeralvo

Reputation: 877

Following zero323's suggestion about posting my comment as an answer:

Increasing spark.streaming.driver.writeAheadLog.batchingTimeout solved the checkpointing timeout issue. We did it after making sure we had room for it. We have been testing it for a while now. So I only recommend increasing it after careful consideration.

DETAILS

We used these 2 settings in $SPARK_HOME/conf/spark-defaults.conf:

spark.streaming.driver.writeAheadLog.allowBatching true spark.streaming.driver.writeAheadLog.batchingTimeout 15000

Originally, we only had spark.streaming.driver.writeAheadLog.allowBatching set to true.

Before the change, we had reproduced the issue mentioned in the question ("...ReceivedBlockTracker: Exception thrown while writing record...") in a testing environment. It occurred every few hours. After the change, the issue disappeared. We ran it for several days before moving to production.

We had found that the getBatchingTimeout() method of the WriteAheadLogUtils class had a default value of 5000ms, as seen here:

def getBatchingTimeout(conf: SparkConf): Long = {
    conf.getLong(DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY, defaultValue = 5000)
}

Upvotes: 5

Related Questions