Reputation: 493
Spark Streaming Problem with Kafka DirectStream:
spark streaming assertion failed: Failed to get records for spark-executor-a-group a-topic 7 244723248 after polling for 4096
Tried:
1) Adjust increasing spark.streaming.kafka.consumer.poll.ms
-- from 512 to 4096, less failed, but even 10s the failed still exists
2) Adjust executor memory from 1G to 2G
-- partly work, much less failed
3) https://issues.apache.org/jira/browse/SPARK-19275
-- still got failed when streaming durations all less than 8s ("session.timeout.ms" -> "30000")
4) Try Spark 2.1
-- problem still there
with Scala 2.11.8, Kafka version : 0.10.0.0, Spark version : 2.0.2
Spark configs
.config("spark.cores.max", "4")
.config("spark.default.parallelism", "2")
.config("spark.streaming.backpressure.enabled", "true")
.config("spark.streaming.receiver.maxRate", "1024")
.config("spark.streaming.kafka.maxRatePerPartition", "256")
.config("spark.streaming.kafka.consumer.poll.ms", "4096")
.config("spark.streaming.concurrentJobs", "2")
using spark-streaming-kafka-0-10-assembly_2.11-2.1.0.jar
Error stacks:
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:194)
...
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:108)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:142)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:108)
...
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Losing 1%+ blocks datum from Kafka with this failure :( pls help!
Upvotes: 6
Views: 9635
Reputation: 1
A solution that worked for me:
max.partition.fetch.bytes
.Reasoning: When the consumer polls for records from Kafka, it prefetches data up to this amount. If the network is slow, it may not be able to prefetch the full amount and the poll in the CachedKafakConsumer
will timeout.
Upvotes: 0
Reputation: 493
Current solution:
num.network.threads
in kafka/config/server.properties, default is 3spark.streaming.kafka.consumer.poll.ms
value ~! a large one ...
without config spark.streaming.kafka.consumer.poll.ms, it's using the spark.network.timeout, which is 120s -- causing some problemUpvotes: 3