Javier
Javier

Reputation: 111

Long and consistent wait between tasks in spark streaming job

I have a spark streaming job running on Mesos. All its batches take the exact same time and this time is much longer than expected. The jobs pull data from kafka, process the data and insert it into cassandra and again back to kafka into a different topic.

Each batch (below) has 3 jobs, 2 of them pull from kafka, process and insert into cassandra, and the other one pulls from kafka, processes and pushes back into kafka.

I inspected the batch in the spark UI and found that they all take the same time (4s) but drilling down more, they actually process for less than a second each but they all have a gap of the same time (around 4 seconds). Adding more executors or more processing power doesn't look like it will make a difference.

Details of batch: Processing time = 12s & total delay = 1.2 s ??

So I drill down into each job of the batch (they all take the exact same time = 4s even if they are doing different processing):

Job 175s

Job 1753

Job 1754

They all take 4 seconds to run one of their stage (the one that reads from kafka). Now I drill down into the stage of one of them (they are all very similar):

Details for stage 2336

Why this wait? The whole thing actually only takes 0.5s to run, it is just waiting. Is it waiting for Kafka?

Has anyone experienced anything similar? What could I have coded wrong or configured incorrectly?

EDIT:

Here is a minimum code that triggers this behaviour. This makes me think that it must be the setup somehow.

object Test {

  def main(args: Array[String]) {

    val sparkConf = new SparkConf(true)
    val streamingContext = new StreamingContext(sparkConf, Seconds(5))

    val kafkaParams = Map[String, String](
      "bootstrap.servers" -> "####,####,####",
      "group.id" -> "test"
    )

    val stream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
      streamingContext, kafkaParams, Set("test_topic")
    )

    stream.map(t => "LEN=" + t._2.length).print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

Even if all the executors are in the same node (spark.executor.cores=2 spark.cores.max=2), the problem is still there and it is exactly 4 seconds as before: One mesos executor

Even if the topic has no messages (batch of 0 records), spark streaming takes 4 seconds for every batch.

The only way that I have been able to fix this is by setting cores=1 and cores.max=1 so that it only creates one task to execute.

This task has locality NODE_LOCAL. So it seems that when NODE_LOCAL the execution is instantaneous but when Locality is ANY it takes 4 seconds to connect to kafka. All the machines are in the same 10Gb network. Any idea why this would be?

Upvotes: 3

Views: 2993

Answers (1)

Javier
Javier

Reputation: 111

The problem was with spark.locality.wait, this link gave me the idea

Its default value is 3 seconds and it was taking this whole time for every batch processed in spark streaming.

I have set it to 0 seconds when submitting the job with Mesos (--conf spark.locality.wait=0) and everything now runs as expected.

Upvotes: 6

Related Questions