Reputation: 3
I have a scenario where I am running a Spark streaming job. This is receiving data from Kafka. All I am trying to do is fetch the records from the stream and place them in local. I also implemented offset handling for it. The size of the message can be upto 5 MB. When I tried with 0.4MB - 0.6MB files, the job was running fine but when I tried running with a 1.3MB file, which is greater than the default 1MB, I am facing the following issue.
java.lang.AssertionError: assertion failed: Ran out of messages before reaching ending offset 9 for topic lms_uuid_test partition 0 start 5. This should not happen, and indicates that messages may have been lost
at scala.Predef$.assert(Predef.scala:179)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:211)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.isEmpty(Iterator.scala:256)
at org.apache.spark.util.NextIterator.isEmpty(NextIterator.scala:21)
at com.scb.BulkUpload.PortalConsumeOffset$$anonfun$createStreamingContext$1$$anonfun$apply$1.apply(PortalConsumeOffset.scala:94)
at com.scb.BulkUpload.PortalConsumeOffset$$anonfun$createStreamingContext$1$$anonfun$apply$1.apply(PortalConsumeOffset.scala:93)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$35.apply(RDD.scala:927)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$35.apply(RDD.scala:927)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1881)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1881)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
I tried adding the following as Kafka consumer properties hoping that the larger messages would be handled but no luck.
"send.buffer.bytes"->"5000000", "max.partition.fetch.bytes" -> "5000000", "consumer.fetchsizebytes" -> "5000000"
I hope someone might be able to help me out. Thanks in advance.
Upvotes: 0
Views: 3964
Reputation: 1316
fetch.message.max.bytes - this will determine the largest size of a message that can be fetched by the consumer.
Property Name: fetch.message.max.bytes
The number of bytes of messages to attempt to fetch for each topic-partition in each fetch request. The fetch request size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch.
Example: Kafka Producer sends 5 MB --> Kafka Broker Allows/Stores 5 MB --> Kafka Consumer receives 5 MB
If so, please set the value to fetch.message.max.bytes=5242880 and then try it should work.
Upvotes: 1