sokeung
sokeung

Reputation: 3

How consumers bind to specified threads

spring-kafka version:2.3.1

application.yml

spring:
  kafka:
    listener:
      concurrency: 3
      type: batch

In batch processing messages, I want to be able to bind threads to handle different partitions.

@KafkaListener(groupId = "service-order", topicPartitions = @TopicPartition(topic = "order", partitions = {"0"}))
public void listenTopicPartition(ConsumerRecord record) {
    System.out.println(record);
}

@KafkaListener(groupId = "service-order", topicPartitions = @TopicPartition(topic = "order", partitions = {"1"}))
public void listenTopicPartition1(ConsumerRecord record) {
    System.out.println(record);
}

@KafkaListener(groupId = "service-order", topicPartitions = @TopicPartition(topic = "order", partitions = {"2"}))
public void listenTopicPartition2(ConsumerRecord record) {
    System.out.println(record);
}

Upvotes: 0

Views: 39

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121550

It is really going to happen as you want according to your current configuration.

Do you have some problems with that? What made you to come to us with the question? I would just do like you have and checked during testing that all those partitions are really processed in their own threads.

With such a logic in your configuration, you don't need that concurrency: 3: every @KafkaListener provides their own container and concurrency is distributed according partitions assigned to this container. Since everyone is going to have only one partitions, the runtime concurrency is always going to be 1.

Upvotes: 1

Related Questions