Samriang
Samriang

Reputation: 443

Apache Flume: kafka.consumer.ConsumerTimeoutException

I'm trying to build pipeline with Apache Flume: spooldir -> kafka channel -> hdfs sink

Events go to kafka topic without problems and I can see them with kafkacat request. But kafka channel can't write files to hdfs via sink. The error is:

Timed out while waiting for data to come from Kafka

Full log:

2016-02-26 18:25:17,125 (SinkRunner-PollingRunner-DefaultSinkProcessor-SendThread(zoo02:2181)) [DEBUG - org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:717)] Got ping response for sessionid: 0x2524a81676d02aa after 0ms

2016-02-26 18:25:19,127 (SinkRunner-PollingRunner-DefaultSinkProcessor-SendThread(zoo02:2181)) [DEBUG - org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:717)] Got ping response for sessionid: 0x2524a81676d02aa after 1ms

2016-02-26 18:25:21,129 (SinkRunner-PollingRunner-DefaultSinkProcessor-SendThread(zoo02:2181)) [DEBUG - org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:717)] Got ping response for sessionid: 0x2524a81676d02aa after 0ms

2016-02-26 18:25:21,775 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:327)] Timed out while waiting for data to come from Kafka kafka.consumer.ConsumerTimeoutException at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:69) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:306) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745)

My FlUME's config is:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c2

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/alex/spoolFlume

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =  hdfs://10.12.0.1:54310/logs/flumetest/
a1.sinks.k1.hdfs.filePrefix = flume-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text

a1.channels.c2.type   = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 1000
a1.channels.c2.brokerList=kafka10:9092,kafka11:9092,kafka12:9092
a1.channels.c2.topic=flume_test_001
a1.channels.c2.zookeeperConnect=zoo00:2181,zoo01:2181,zoo02:2181

# Bind the source and sink to the channel
a1.sources.r1.channels = c2
a1.sinks.k1.channel = c2

With memory channel instead of kafka channel all works good.

Thanks for any ideas in advance!

Upvotes: 0

Views: 1036

Answers (3)

M.W.Zheng
M.W.Zheng

Reputation: 1

I read flume's source code, and found that flume reads value of the key "timeout" for "consumer.timeout.ms".

So you can config the value for "consumer.timeout.ms" like this:

agent1.channels.kafka_channel.timeout=-1

Upvotes: 0

r2d2
r2d2

Reputation: 41

Kafka's ConsumerConfig class has the "consumer.timeout.ms" configuration property, which Kafka sets by default to -1. Any new Kafka Consumer is expected to override the property with a suitable value.

Below is a reference from Kafka documentation :

consumer.timeout.ms     -1  
By default, this value is -1 and a consumer blocks indefinitely if no new message is available for consumption. By setting the value to a positive integer, a timeout exception is thrown to the consumer if no message is available for consumption after the specified timeout value.

When Flume creates a Kafka channel, it is setting the timeout.ms value to 100, as seen on the Flume logs at the INFO level. That explains why we see a ton of these ConsumerTimeoutExceptions.

 level: INFO Post-validation flume configuration contains configuration for agents: [agent]
 level: INFO Creating channels
 level: DEBUG Channel type org.apache.flume.channel.kafka.KafkaChannel is a custom type
 level: INFO Creating instance of channel c1 type org.apache.flume.channel.kafka.KafkaChannel
 level: DEBUG Channel type org.apache.flume.channel.kafka.KafkaChannel is a custom type
 level: INFO Group ID was not specified. Using flume as the group id.
 level: INFO {metadata.broker.list=kafka:9092, request.required.acks=-1, group.id=flume, 
              zookeeper.connect=zookeeper:2181, **consumer.timeout.ms=100**, auto.commit.enable=false}
 level: INFO Created channel c1

Going by the Flume user guide on Kafka channel settings, I tried to override this value by specifying the below, but that doesn't seem to work though:

agent.channels.c1.kafka.consumer.timeout.ms=5000

Also, we did a load test with pounding data through the channels constantly, and this exception didn't occur during the tests.

Upvotes: 0

GUO Songchu
GUO Songchu

Reputation: 26

ConsumerTimeoutException means there is no new message for a long time, doesn't mean connect time out for Kafka.

http://kafka.apache.org/documentation.html

consumer.timeout.ms -1 Throw a timeout exception to the consumer if no message is available for consumption after the specified interval

Upvotes: 0

Related Questions