Timothy
Timothy

Reputation: 1035

Adding partition on Spring boot kafka take some time to effective

I'm altering Kafka topic, adding partition to it (from 3 partitions to 4 partitions). On consumer, I already has 4 concurrency, using @KafkaListener

@KafkaListener(topics = "t_multi_partitions", concurrency = "4")

The producer is sending message every few seconds, but usually less than 5 seconds for each message.

The funny thing is, when I altering the partition

kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic t_multi_partitions --partitions 4

It does not take effect directly.
It took approximate 5 minutes before producer start sending message to the 4th partition, and also 5 minutes for rebalancing so the 4th consumer take effect

Is this normal? I'm using Spring boot 2.1 How can I adjust this 5 minutes time (either longer or shorter)?

Consumer code (simplified)

@KafkaListener(topics = "t_multi_partitions", concurrency = "4")
public void listen(ConsumerRecord<String, String> message) throws InterruptedException {
    log.info("Key : {}, Partition : {}, Message : {}", message.key(), message.partition(), message.value());
}

Upvotes: 2

Views: 3101

Answers (2)

Timothy
Timothy

Reputation: 1035

Based on @kkflf guidance, producer config

@Configuration
public class KafkaConfig {

    @Autowired
    private KafkaProperties kafkaProperties;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        var props = kafkaProperties.buildProducerProperties();
        props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "600000");

        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

}

Upvotes: 0

kkflf
kkflf

Reputation: 2570

This is controlled by Apache-Kafka and not Spring-Kafka. The behaviour is normal.

You can modify the refresh rate by adjusting metadata.max.age.ms. This is a producer and consumer config. The producer and consumer config are not required to have identical config value.

Quote from the Apache-Kafka documentation.

The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.

Spring-Kafka in your example is running four @KafkaListener threads. The topics and partitions are delegated to each @KafkaListener based on the assignor strategy defined by partition.assignment.strategy. RangeAssignor is the default strategy. This is a consumer config.

Quote from the Apache-Kafka documentation.

The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used

Upvotes: 3

Related Questions