TeeKai
TeeKai

Reputation: 691

Kafka Consumer - receiving messages Inconsistently

I can send and receive messages on command line against a Kafka location installation. I also can send messages through a Java code. And those messages are showing up in a Kafka command prompt. I also have a Java code for the Kafka consumer. The code received message yesterday. It doesn't receive any messages this morning, however. The code has not been changed. I am wondering whether the property configuration isn't quite right nor not. Here is my configuration:

The Producer:

bootstrap.servers - localhost:9092
group.id - test
key.serializer - StringSerializer.class.getName()
value.serializer - StringSerializer.class.getName()

and the ProducerRecord is set as

ProducerRecord<String, String>("test", "mykey",  "myvalue")

The Consumer:

zookeeper.connect - "localhost:2181"
group.id - "test"
zookeeper.session.timeout.ms - 500
zookeeper.sync.time.ms - 250
auto.commit.interval.ms - 1000
key.deserializer - org.apache.kafka.common.serialization.StringDeserializer
value.deserializer - org.apache.kafka.common.serialization.StringDeserializer

and for Java code:

   Map<String, Integer> topicCount = new HashMap<>();
   topicCount.put("test", 1);

   Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer
            .createMessageStreams(topicCount);
   List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);

What is missing?

Upvotes: 0

Views: 1747

Answers (1)

alexlod
alexlod

Reputation: 1365

A number of things could be going on.

First, your consumer's ZooKeeper session timeout is very low, which means the consumer may be experiencing many "soft failures" due to garbage collection pauses. When this happens, the consumer group will rebalance, which can pause consumption. And if this is happening very frequently, the consumer could get into a state where it never consumes messages because it's constantly being rebalanced. I suggest increasing the ZooKeeper session timeout to 30 seconds to see if this resolves the issue. If so, you can experiment setting it lower.

Second, can you confirm new messages are being produced to the "test" topic? Your consumer will only consume new messages that it hasn't committed yet. It's possible the topic doesn't have any new messages.

Third, do you have other consumers in the same consumer group that could be processing the messages? If one consumer is experiencing frequent soft failures, other consumers will be assigned its partitions.

Finally, you're using the "old" consumer which will eventually be removed. If possible, I suggest moving to the "new" consumer (KafkaConsumer.java) which was made available in Kafka 0.9. Although I can't promise this will resolve your issue.

Hope this helps.

Upvotes: 1

Related Questions