Reputation: 325
I have writen storm topology which fetching data from kafka using kafka spout it is running well in my local environment but in cluster
I get the following error:
2018-05-16 18:25:59.358 o.a.s.k.ZkCoordinator Thread-25-kafkaSpout-executor[20 20] [INFO] Task [1/1] Refreshing partition manager connections 2018-05-16 18:25:59.359 o.a.s.k.DynamicBrokersReader Thread-25-kafkaSpout-executor[20 20] [INFO] Read partition info from zookeeper: GlobalPartitionInformation{topic=data-ops, partitionMap={0=uat-datalake-node2.org:6667}} 2018-05-16 18:25:59.359 o.a.s.k.KafkaUtils Thread-25-kafkaSpout-executor[20 20] [INFO] Task [1/1] assigned [Partition{host=uat-datalake-node2.org:6667, topic=data-ops, partition=0}] 2018-05-16 18:25:59.360 o.a.s.k.ZkCoordinator Thread-25-kafkaSpout-executor[20 20] [INFO] Task [1/1] Deleted partition managers: [] 2018-05-16 18:25:59.360 o.a.s.k.ZkCoordinator Thread-25-kafkaSpout-executor[20 20] [INFO] Task [1/1] New partition managers: [] 2018-05-16 18:25:59.360 o.a.s.k.ZkCoordinator Thread-25-kafkaSpout-executor[20 20] [INFO] Task [1/1] Finished refreshing 2018-05-16 18:25:59.361 k.c.SimpleConsumer Thread-25-kafkaSpout-executor[20 20] [INFO] Reconnect due to error: java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:85) [kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) [kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132) [kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) [kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) [kafka_2.10-0.10.2.1.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131) [kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) [kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) [kafka_2.10-0.10.2.1.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130) [kafka_2.10-0.10.2.1.jar:?] at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) [kafka_2.10-0.10.2.1.jar:?] at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:191) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:189) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:138) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.daemon.executor$fn__6505$fn__6520$fn__6551.invoke(executor.clj:651) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144] 2018-05-16 18:26:09.372 o.a.s.k.KafkaUtils Thread-25-kafkaSpout-executor[20 20] [WARN] Network error when fetching messages: java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) ~[?:1.8.0_144] at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.8.0_144] at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.8.0_144] at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[kafka-clients-0.10.2.1.jar:?] at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) ~[kafka_2.10-0.10.2.1.jar:?] at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:191) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:189) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:138) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.daemon.executor$fn__6505$fn__6520$fn__6551.invoke(executor.clj:651) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144] 2018-05-16 18:26:09.373 o.a.s.k.KafkaSpout Thread-25-kafkaSpout-executor[20 20] [WARN] Fetch failed org.apache.storm.kafka.FailedFetchException: java.net.SocketTimeoutException at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:199) ~[storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:189) ~[storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:138) ~[storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.daemon.executor$fn__6505$fn__6520$fn__6551.invoke(executor.clj:651) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144] Caused by: java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) ~[?:1.8.0_144] at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.8.0_144] at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.8.0_144] at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[kafka-clients-0.10.2.1.jar:?] at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) ~[kafka_2.10-0.10.2.1.jar:?] at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:191) ~[storm-kafka-1.0.1.jar:1.0.1] ... 7 more
Upvotes: 0
Views: 1248
Reputation: 3651
Looks like you got a timeout when the Storm worker tried to read from the Kafka broker. Maybe the connection between them is flaky or slow?
That said, the stack trace seems to say that the consumer has reconnected, so if this only happens rarely you might have just had a hiccup in the connection between the worker and Kafka.
If this happens regularly and you're sure the connection is stable, I'd try asking over on the Kafka mailing list at https://kafka.apache.org/contact. If you post your issue and which Kafka version you're using, they might be able to tell you whether there are issues that could cause a socket timeout for the consumer.
Upvotes: 0