Hirein
Hirein

Reputation: 235

How to create multiple consumers in a consume group using Spring provided Kafka apis

I am trying to create multiple consumers in a consumer group for parallel processing since we have heavy inflow of messages. I am using spring boot and KafkTemplate. How can we create multiple consumers belonging to single consumer group, in single instance of spring boot application? Does having multiple methods annotated with @KafkaListener will create multiple consumers?

Upvotes: 3

Views: 5074

Answers (3)

Hirein
Hirein

Reputation: 235

As @Salavat Yalalo suggested I made my Kafka container factory to be ConcurrentKafkaListenerContainerFactory. On the @KafkaListenere method I added option called concurrency which accepts an integer as a string which indicates number of consumers to be spanned, like below

@KafakListener(concurrency ="4", containerFactory="concurrentKafkaListenerContainerFactory(bean name of the factory)",..other optional values)
public void topicConsumer(Message<MyObject> myObject){
//.....
}

When ran, I see 4 consumers being created in a single consumer group.

Upvotes: 0

Salavat Yalalov
Salavat Yalalov

Reputation: 114

You have to use ConcurrentMessageListenerContainer. It delegates to one or more KafkaMessageListenerContainer instances to provide multi-threaded consumption.

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(10);
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
}

factory.setConcurrency(10) creates 10 KafkaMessageListenerContainer instances. Each instance gets some amount of partitions. It depends on the number of partitions you configured when you created the topic.

Some preparation steps:

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

private final static String BOOTSTRAP_ADDRESS = "localhost:9092";
private final static String CONSUMER_GROUP = "consumer-group-1";
private final static String TOPIC = "test-topic";

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}


@KafkaListener(topics = TOPIC, containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload String message) {
    logger.info(message);
}

public void start() {
    try {
        Thread.sleep(5000L);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    
    for (int i = 0; i < 10; i++) {
        kafkaTemplate.send(TOPIC, i, String.valueOf(i), "Message " + i);
    }
    
    logger.info("All message are sent");
}

If you run the method above you can see that each KafkaMessageListenerContainer instance processes the messages being put into the partition which that instance serves. Thread.sleep() is added to wait for the consumers to be initialized.

2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-4-C-1] r.s.c.KafkaConsumersDemo                 : Message 5
2020-07-01 15:48:34.801  INFO 201566 --- [ntainer#0-6-C-1] r.s.c.KafkaConsumersDemo                 : Message 7
2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-7-C-1] r.s.c.KafkaConsumersDemo                 : Message 8
2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-9-C-1] r.s.c.KafkaConsumersDemo                 : Message 1
2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-0-C-1] r.s.c.KafkaConsumersDemo                 : Message 0
2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-8-C-1] r.s.c.KafkaConsumersDemo                 : Message 9
2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-3-C-1] r.s.c.KafkaConsumersDemo                 : Message 4
2020-07-01 15:48:34.801  INFO 201566 --- [ntainer#0-2-C-1] r.s.c.KafkaConsumersDemo                 : Message 3
2020-07-01 15:48:34.801  INFO 201566 --- [ntainer#0-1-C-1] r.s.c.KafkaConsumersDemo                 : Message 2
2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-5-C-1] r.s.c.KafkaConsumersDemo                 : Message 6

Upvotes: 5

Artem Bilan
Artem Bilan

Reputation: 121177

Yes, the @KafkaListener will create multiple consumers for you.

With that you can configure all of them to use the same topic and belong to the same group. The Kafka coordinator will distribute partitions to your consumers.

Although if you have only one partition in the topic, the concurrency won't happen: a single partition is processed in a single thread.

Another option is indeed to configure a concurrency and again several consumers are going to be created according concurrency <-> partition state.

Upvotes: 2

Related Questions