mindon
mindon

Reputation: 493

spark streaming assertion failed: Failed to get records for spark-executor-a-group a-topic 7 244723248 after polling for 4096

Spark Streaming Problem with Kafka DirectStream:

spark streaming assertion failed: Failed to get records for spark-executor-a-group a-topic 7 244723248 after polling for 4096

Tried:

1) Adjust increasing spark.streaming.kafka.consumer.poll.ms

-- from 512 to 4096, less failed, but even 10s the failed still exists

2) Adjust executor memory from 1G to 2G

-- partly work, much less failed

3) https://issues.apache.org/jira/browse/SPARK-19275

-- still got failed when streaming durations all less than 8s ("session.timeout.ms" -> "30000")

4) Try Spark 2.1

-- problem still there


with Scala 2.11.8, Kafka version : 0.10.0.0, Spark version : 2.0.2

Spark configs

.config("spark.cores.max", "4")
.config("spark.default.parallelism", "2")
.config("spark.streaming.backpressure.enabled", "true")
.config("spark.streaming.receiver.maxRate", "1024")
.config("spark.streaming.kafka.maxRatePerPartition", "256")
.config("spark.streaming.kafka.consumer.poll.ms", "4096")
.config("spark.streaming.concurrentJobs", "2")

using spark-streaming-kafka-0-10-assembly_2.11-2.1.0.jar

Error stacks:

at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:194)
...
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:108)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:142)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:108)
...
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Losing 1%+ blocks datum from Kafka with this failure :( pls help!

Upvotes: 6

Views: 9635

Answers (2)

reese
reese

Reputation: 1

A solution that worked for me:

  • Decrease the Kafka Consumer max.partition.fetch.bytes.

Reasoning: When the consumer polls for records from Kafka, it prefetches data up to this amount. If the network is slow, it may not be able to prefetch the full amount and the poll in the CachedKafakConsumer will timeout.

Upvotes: 0

mindon
mindon

Reputation: 493

Current solution:

  • Increase num.network.threads in kafka/config/server.properties, default is 3
  • Increase spark.streaming.kafka.consumer.poll.ms value ~! a large one ... without config spark.streaming.kafka.consumer.poll.ms, it's using the spark.network.timeout, which is 120s -- causing some problem
  • Optional step: Decrease the "max.poll.records", default is 500
  • Optional step: use Future{} to run time-cost-task in parallel

Upvotes: 3

Related Questions