Deibys
Deibys

Reputation: 659

Behavior of KafkaListener listening to several topics

I have bellow code:

KafkaListener(
        topics = {
                "#{@topic1}",
                "#{@topics}"
        },
        containerFactory = "kafkaTopicListenerContainerFactory"
)
public void onNewConsumerRecords(List<ConsumerRecord<String, byte[]>> records, Acknowledgment acknowledgement) {
    // Some logic
}

Questions:

  1. Will the variable records during a poll or a execution of the method onNewConsumerRecords. , contain records from both topics or only one topic ?
  2. What will be default topic assignment strategy ? if it is range and I have concurrency set to 4 , and topics don't have the same number of partitions , will it be possible some threads will be idle or underused ?

Upvotes: 0

Views: 622

Answers (1)

Gary Russell
Gary Russell

Reputation: 174574

It will normally contain records from multiple topics/partitions.

See the documentation:

https://docs.spring.io/spring-kafka/docs/current/reference/html/#using-ConcurrentMessageListenerContainer

When listening to multiple topics, the default partition distribution may not be what you expect. For example, if you have three topics with five partitions each and you want to use concurrency=15, you see only five active consumers, each assigned one partition from each topic, with the other 10 consumers being idle. This is because the default Kafka PartitionAssignor is the RangeAssignor (see its Javadoc). For this scenario, you may want to consider using the RoundRobinAssignor instead, which distributes the partitions across all of the consumers. Then, each consumer is assigned one topic or partition. To change the PartitionAssignor, you can set the partition.assignment.strategy consumer property (ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG) in the properties provided to the DefaultKafkaConsumerFactory.

When using Spring Boot, you can assign set the strategy as follows:

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

Upvotes: 1

Related Questions