Aaquib Khwaja
Aaquib Khwaja

Reputation: 594

SparkStreaming+Kafka: Failed to get records after polling for 60000

I'm doing Spark Streaming over Kafka. The streaming job starts fine and runs for a few hours before it runs into the following issue:

17/05/18 03:44:47 ERROR Executor: Exception in task 8.0 in stage 1864.0 (TID 27968) java.lang.AssertionError: assertion failed: Failed to get records for spark-executor-c10f4ea9-a1c6-4a9f-b87f-8d6ff66e10a5 madlytics-rt_1 3 1150964759 after polling for 60000 at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

Also, i increased the values of heartbeat.interval.ms, session.timeout.ms and request.timeout.ms appropriately as suggested here: https://issues.apache.org/jira/browse/SPARK-19275

Given below are some relevant configs:

batch.interval = 60s
spark.streaming.kafka.consumer.poll.ms = 60000
session.timeout.ms = 60000 (default: 30000)
heartbeat.interval.ms = 6000 (default: 3000)
request.timeout.ms = 90000 (default: 40000)

Also, the Kafka cluster is a 5 node one, and the topic that i'm reading has 15 partitions. Some other Kafka configs are listed below:

num.network.threads=8
num.io.threads=8

Any help will be much appreciated. Thanks.

Upvotes: 4

Views: 6456

Answers (2)

neel
neel

Reputation: 304

I solved the issue using a simple configuration change which was quite apparent but it took me sometime to realize how such a default (mis)configuration could be left untreated.

The primary issue is Spark config spark.streaming.kafka.consumer.poll.ms (default 512ms in KafkaRDD) or spark.network.timeout (default 120sec, if spark.streaming.kafka.consumer.poll.ms is not set) is always less than Kafka consumer request.timeout.ms (default 305000ms in Kafka newconsumerapi) ... hence spark polling always times out before timeout happens at Kafka consumer request/poll (when there are no records available in Kafka topic).

Simply increasing spark.streaming.kafka.consumer.poll.ms to a value greater than Kafka request.timeout.ms should do the trick. Also adjust Kafka consumer max.poll.interval.ms to be always less than request.timeout.ms.

Q.E.D and Good luck.

Upvotes: 3

reim
reim

Reputation: 612

In my experience this particular failure is a symptom of a Kafka cluster which is overloaded. The usual suspects are always GC worldstops and threads starving.

On top of this, everything might be fine with Kafka on the surface, but perhaps not.

Is it spending a lot of time rebalancing after you added a partition? Or is it maintaining a humongous offsets topic because of all the load tests you performed?

What happened to me once is that the cluster was fine on the surface, but this timeout came out here and there. On a brand new, and even smaller, cluster, this issue disappeared.

Upvotes: 0

Related Questions