Reputation: 1196
I have a kafka listener configured in our Spring Boot application as follows:
@KafkaListener(topicPartitions = @TopicPartition(topic = 'data.all', partitions = { "0", "1", "2" }), groupId = "kms")
public void listen(ObjectNode message) throws JsonProcessingException {
// Code to convert to json string and write to ElasticSearch
}
This application gets deployed to and run on 3 servers and, despite all having the group id of kms
, they all get a copy of the message which means I get 3 identical records in Elastic. When I'm running an instance locally, 4 copies get written.
I've confirmed that the producer is only writing 1 message to the topic by checking the count of all messages on the topic before and after the write occurs; it only increases by 1. How can I prevent this?
Upvotes: 2
Views: 5518
Reputation: 174484
When you manually assign partitions like that, you are responsible for distributing the partitions across the instances.
The group is ignored for the purpose of partition distribution, but is still used to track offsets, if needed.
You must use group management and let Kafka do the partition assignment for you, or assign the partitions manually for each instance.
Instead of topicPartitions
, use topics = "data.all"
Upvotes: 4
Reputation: 106
What happens when you don't assign partition manually
A
) joins with consumer group (lets say consumer
)A
as we have only one consumer group consumer
B
tries to join same consumer group consumer
then again partition reassignment will happen and both A & B will get partition to listen to messagesWhat is happening in your case is, more than 1 consumer is listening to same partitions so all the consumers who are listening to same partitions within same consumer group also, will receive messages from that partition. So mutual exclusivity between consumers in a consumer group is lost due to more than 1 consumer is listening same partitions.
Upvotes: 1