Reputation: 659
I have bellow code:
KafkaListener(
topics = {
"#{@topic1}",
"#{@topics}"
},
containerFactory = "kafkaTopicListenerContainerFactory"
)
public void onNewConsumerRecords(List<ConsumerRecord<String, byte[]>> records, Acknowledgment acknowledgement) {
// Some logic
}
Questions:
Upvotes: 0
Views: 622
Reputation: 174574
It will normally contain records from multiple topics/partitions.
See the documentation:
When listening to multiple topics, the default partition distribution may not be what you expect. For example, if you have three topics with five partitions each and you want to use
concurrency=15
, you see only five active consumers, each assigned one partition from each topic, with the other 10 consumers being idle. This is because the default KafkaPartitionAssignor
is theRangeAssignor
(see its Javadoc). For this scenario, you may want to consider using theRoundRobinAssignor
instead, which distributes the partitions across all of the consumers. Then, each consumer is assigned one topic or partition. To change the PartitionAssignor, you can set the partition.assignment.strategy consumer property (ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
) in the properties provided to theDefaultKafkaConsumerFactory
.
When using Spring Boot, you can assign set the strategy as follows:
spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor
Upvotes: 1