Reputation: 1675
i am using storm0.9.4 with storm-kafka:0.9.0-wip16a-scala292 as the dependency to read from kafka 0.7 .
As soon as i start the topology within a few minutes i get the error below:
kafka.common.OffsetOutOfRangeException: null
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.7.0_75]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) ~[na:1.7.0_75]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.7.0_75]
at java.lang.reflect.Constructor.newInstance(Constructor.java:526) ~[na:1.7.0_75]
at java.lang.Class.newInstance(Class.java:379) ~[na:1.7.0_75]
at kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53) ~[stormjar.jar:na]
at kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99) ~[stormjar.jar:na]
at kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82) ~[stormjar.jar:na]
at scala.collection.IterableLike$class.foreach(IterableLike.scala:73) ~[stormjar.jar:na]
at kafka.message.MessageSet.foreach(MessageSet.scala:87) ~[stormjar.jar:na]
at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:104) ~[stormjar.jar:na]
at kafka.message.MessageSet.size(MessageSet.scala:87) ~[stormjar.jar:na]
at storm.kafka.PartitionManager.fill(PartitionManager.java:113) ~[stormjar.jar:na]
at storm.kafka.PartitionManager.next(PartitionManager.java:83) ~[stormjar.jar:na]
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:106) ~[stormjar.jar:na]
at backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:565) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75] 2015-04-30T01:49:15.118-0500 backtype.storm.daemon.executor [ERROR] kafka.common.OffsetOutOfRangeException: null
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.7.0_75]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) ~[na:1.7.0_75]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.7.0_75]
at java.lang.reflect.Constructor.newInstance(Constructor.java:526) ~[na:1.7.0_75]
at java.lang.Class.newInstance(Class.java:379) ~[na:1.7.0_75]
at kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53) ~[stormjar.jar:na]
at kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99) ~[stormjar.jar:na]
at kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82) ~[stormjar.jar:na]
at scala.collection.IterableLike$class.foreach(IterableLike.scala:73) ~[stormjar.jar:na]
at kafka.message.MessageSet.foreach(MessageSet.scala:87) ~[stormjar.jar:na]
at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:104) ~[stormjar.jar:na]
at kafka.message.MessageSet.size(MessageSet.scala:87) ~[stormjar.jar:na]
at storm.kafka.PartitionManager.fill(PartitionManager.java:113) ~[stormjar.jar:na]
at storm.kafka.PartitionManager.next(PartitionManager.java:83) ~[stormjar.jar:na]
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:106) ~[stormjar.jar:na]
at backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:565) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75] 2015-04-30T01:49:15.129-0500 backtype.storm.util [ERROR] Halting process: ("Worker died") java.lang.RuntimeException: ("Worker died")
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.4.jar:0.9.4]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__5102$fn__5103.invoke(worker.clj:495) [storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.executor$mk_executor_data$fn__4555$fn__4556.invoke(executor.clj:240) [storm-core-0.9.4.jar:0.9.4]
at backtype.storm.util$async_loop$fn__458.invoke(util.clj:473) [storm-core-0.9.4.jar:0.9.4]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]
I am not able to find why this is happening. Please provide Any help/pointers as to how do i find the problem.
Upvotes: 0
Views: 1071
Reputation: 1675
There was a problem in the way the spout was configured. We have custom properties we use to initialize the SpoutConfig object, we set both forceFromStart and startOffsetTime(to latest) always. The problem was that the property relating to the later was configured with the wrong key and hence sometimes the zk entry for a spout was having an earlier offset entry which no longer existed in kafka or was referring to an entry present when the storm topology started but were removed from kafka before storm could finish the backlog. Since we anyways did not want to satisfy this scenario we just corrected the configuration and its working.
Upvotes: 1