karthick
karthick

Reputation: 6178

How to handle OffsetOutOfRangeException Error?

I am using storm and kafka for analysis realtime data.

I am getting the following error in spout

Error

kafka.common.OffsetOutOfRangeException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:374)
at kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53)
at kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99)
at kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
at kafka.message.MessageSet.foreach(MessageSet.scala:87)
at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:104)
at kafka.message.MessageSet.size(MessageSet.scala:87)
at storm.kafka.PartitionManager.fill(PartitionManager.java:113)
at storm.kafka.PartitionManager.next(PartitionManager.java:83)
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:106)
at backtype.storm.daemon.executor$fn__3430$fn__3445$fn__3474.invoke(executor.clj:547)
at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:745)

Note:

  1. startOffsetTime of spout is -1.
  2. Storm version - 0.9.0
  3. kafka verison - 0.7.2

How to fix this issue?

Any suggestion will be grateful.

Upvotes: 3

Views: 2386

Answers (1)

Amol M Kulkarni
Amol M Kulkarni

Reputation: 21639

kafka.common.OffsetOutOfRangeException

Generally indicates that client has requested a range no longer available on the server

This could happen as the message with the requested offset in topic log does not exists anymore based on the retention policy in your Kafka config.

Following is the sample of config: (Look into it and set it for the optimal settings as per )

############################# Log Retention Policy #############################    
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.    
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=536870912

# The interval at which log segments are checked to see if they can be deleted according 
# to the retention policies
log.cleanup.interval.mins=1

Note: Kafka will auto delete message from file according your config and consumers keep partition offset in zookeeper. (consider offset now is 3000). While Kafka finished the cleaning, the offset for this partition is reset so that the biggest offset Must be smaller than the one consumer stored in zookeeper (3000). Here it can be a problem when consumers get the current offset from zookeeper(i.e 3000 again) and use this offset to read message from Kafka which does not exist. So the solution is to handle auto delete interval to the optimal.

Check below links as well for more info.

Upvotes: 1

Related Questions