Yogesh Bombe
Yogesh Bombe

Reputation: 325

java.nio.channels.ClosedChannelException while Consuming message from storm spout

I have writen storm topology which fetching data from kafka using kafka spout it is running well in my local environment but in cluster

I get the following error:

2018-05-16 18:25:59.358 o.a.s.k.ZkCoordinator Thread-25-kafkaSpout-executor[20 20] [INFO] Task [1/1] Refreshing partition manager connections 2018-05-16 18:25:59.359 o.a.s.k.DynamicBrokersReader Thread-25-kafkaSpout-executor[20 20] [INFO] Read partition info from zookeeper: GlobalPartitionInformation{topic=data-ops, partitionMap={0=uat-datalake-node2.org:6667}} 2018-05-16 18:25:59.359 o.a.s.k.KafkaUtils Thread-25-kafkaSpout-executor[20 20] [INFO] Task [1/1] assigned [Partition{host=uat-datalake-node2.org:6667, topic=data-ops, partition=0}] 2018-05-16 18:25:59.360 o.a.s.k.ZkCoordinator Thread-25-kafkaSpout-executor[20 20] [INFO] Task [1/1] Deleted partition managers: [] 2018-05-16 18:25:59.360 o.a.s.k.ZkCoordinator Thread-25-kafkaSpout-executor[20 20] [INFO] Task [1/1] New partition managers: [] 2018-05-16 18:25:59.360 o.a.s.k.ZkCoordinator Thread-25-kafkaSpout-executor[20 20] [INFO] Task [1/1] Finished refreshing 2018-05-16 18:25:59.361 k.c.SimpleConsumer Thread-25-kafkaSpout-executor[20 20] [INFO] Reconnect due to error: java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:85) [kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) [kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132) [kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) [kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) [kafka_2.10-0.10.2.1.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131) [kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) [kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) [kafka_2.10-0.10.2.1.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130) [kafka_2.10-0.10.2.1.jar:?] at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) [kafka_2.10-0.10.2.1.jar:?] at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:191) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:189) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:138) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.daemon.executor$fn__6505$fn__6520$fn__6551.invoke(executor.clj:651) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144] 2018-05-16 18:26:09.372 o.a.s.k.KafkaUtils Thread-25-kafkaSpout-executor[20 20] [WARN] Network error when fetching messages: java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) ~[?:1.8.0_144] at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.8.0_144] at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.8.0_144] at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[kafka-clients-0.10.2.1.jar:?] at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) ~[kafka_2.10-0.10.2.1.jar:?] at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:191) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:189) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:138) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.daemon.executor$fn__6505$fn__6520$fn__6551.invoke(executor.clj:651) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144] 2018-05-16 18:26:09.373 o.a.s.k.KafkaSpout Thread-25-kafkaSpout-executor[20 20] [WARN] Fetch failed org.apache.storm.kafka.FailedFetchException: java.net.SocketTimeoutException at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:199) ~[storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:189) ~[storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:138) ~[storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.daemon.executor$fn__6505$fn__6520$fn__6551.invoke(executor.clj:651) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144] Caused by: java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) ~[?:1.8.0_144] at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.8.0_144] at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.8.0_144] at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[kafka-clients-0.10.2.1.jar:?] at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) ~[kafka_2.10-0.10.2.1.jar:?] at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:191) ~[storm-kafka-1.0.1.jar:1.0.1] ... 7 more

Upvotes: 0

Views: 1248

Answers (1)

Stig Rohde Døssing
Stig Rohde Døssing

Reputation: 3651

Looks like you got a timeout when the Storm worker tried to read from the Kafka broker. Maybe the connection between them is flaky or slow?

That said, the stack trace seems to say that the consumer has reconnected, so if this only happens rarely you might have just had a hiccup in the connection between the worker and Kafka.

If this happens regularly and you're sure the connection is stable, I'd try asking over on the Kafka mailing list at https://kafka.apache.org/contact. If you post your issue and which Kafka version you're using, they might be able to tell you whether there are issues that could cause a socket timeout for the consumer.

Upvotes: 0

Related Questions