Ahmed Salah
Ahmed Salah

Reputation: 11

Kafka: Multiple instances in the same consumer group listening to the same partition inside for topic

I have two instances of kafka consumer, configured with the same consumer group and listening to partition 0 in the same topic. The problem is when I send a message to the topic. The message is consumed by both instances which supposed not to happen as they are in the same group. I am using Spring Boot configuration class to configure them.

Here is the configuration:

@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
    return factory;
}

@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {

    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    return props;
}

Here is the listener:

@KafkaListener(topicPartitions = {@TopicPartition(topic = "${kafka.topic.orders}", partitions = "0")})
public void consume(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {

    log.info("message received at " + orderTopic + "at partition 0");
    processRecord(record, acknowledgment);
}

Upvotes: 1

Views: 2974

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

Kafka doesn't work like that; when you manually assign partitions like that (@TopicPartition) you are explicitly telling Kafka you want to receive messages from that partition - the consumer assign() s the partitions to itself.

In other words, with manual assignment, you are taking responsibility for distributing the partitions.

You need use group management, and let Kafka assign the topics to the instances.

use topics = "..." and Kafka will do the assignment. If you don't have enough topics, instances will be idle. You need at least as many partitions as instances to have all instances participate.

Upvotes: 4

Related Questions