Reputation: 87
In the past, I made the wrong decision, that is, operating multiple consumers in a thread (same group id), but there was no corresponding usage scenario at the time, so this problem was not revealed.
Now, I realize that my program will go wrong when I have multiple consumers, query the corresponding information, plus my own test, I got these three scenarios.
I think I know how to change it, but I don't know why, what kind of things kafka consumers store in the thread will lead to this situation.
my code is like this
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
properties.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "30000");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "shoothzj.group");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
return properties;
private Map<String, KafkaConsumer<String, T>> consumerMap = new HashMap<>();
for (KafkaConsumer<String, T> consumer :consumerMap.values()) {
consumer.poll(1000);
//handle it
}
Upvotes: 1
Views: 436
Reputation: 4698
You can't have multiple consumers with same group id in the same thread. Below is the excerpt from the book Kafka: The Definitive Guide
You can’t have multiple consumers that belong to the same group in one thread and you can’t have multiple threads safely use the same consumer. One consumer per thread is the rule. To run multiple consumers in the same group in one application, you will need to run each in its own thread. It is useful to wrap the consumer logic in its own object and then use Java’s ExecutorService to start multiple threads each with its own consumer. The Confluent blog has a tutorial that shows how to do just that.
We faced a similar issue in one of our design, and learnt our lesson the hard way. If I remember the root cause correctly, it was something like this. Let's say there are two consumers in the same thread. If the processing after the first poll takes long time, the second consumer might fail to send the heartbeat to the groupcoordinator, as sending of heartbeat happens in the poll() method, and that would trigger rebalance, and this will cause a deadlock like situation. So, both your consumers would get stuck, waiting for re-balancing to finish.
Upvotes: 3