Reputation: 1035
I'm altering Kafka topic, adding partition to it (from 3 partitions to 4 partitions).
On consumer, I already has 4 concurrency, using @KafkaListener
@KafkaListener(topics = "t_multi_partitions", concurrency = "4")
The producer is sending message every few seconds, but usually less than 5 seconds for each message.
The funny thing is, when I altering the partition
kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic t_multi_partitions --partitions 4
It does not take effect directly.
It took approximate 5 minutes before producer start sending message to the 4th partition, and also 5 minutes for rebalancing so the 4th consumer take effect
Is this normal? I'm using Spring boot 2.1 How can I adjust this 5 minutes time (either longer or shorter)?
Consumer code (simplified)
@KafkaListener(topics = "t_multi_partitions", concurrency = "4")
public void listen(ConsumerRecord<String, String> message) throws InterruptedException {
log.info("Key : {}, Partition : {}, Message : {}", message.key(), message.partition(), message.value());
}
Upvotes: 2
Views: 3101
Reputation: 1035
Based on @kkflf guidance, producer config
@Configuration
public class KafkaConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Map<String, Object> producerConfigs() {
var props = kafkaProperties.buildProducerProperties();
props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "600000");
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
}
Upvotes: 0
Reputation: 2570
This is controlled by Apache-Kafka and not Spring-Kafka. The behaviour is normal.
You can modify the refresh rate by adjusting metadata.max.age.ms
. This is a producer and consumer config. The producer and consumer config are not required to have identical config value.
Quote from the Apache-Kafka documentation.
The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.
Spring-Kafka in your example is running four @KafkaListener
threads. The topics and partitions are delegated to each @KafkaListener
based on the assignor strategy defined by partition.assignment.strategy
. RangeAssignor
is the default strategy. This is a consumer config.
Quote from the Apache-Kafka documentation.
The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used
Upvotes: 3