mac
mac

Reputation: 643

KafkaMessageListenerContainer vs ConcurrentMessageListenerContainer

If I use listener.concurrency property, would this use KafkaMessageListenerContainer or ConcurrentMessageListenerContainer? Note I have not defined any beans explicitly and I leave the required bean creation to spring boot.

I wanted to know if my understanding is correct. I would want parallelism in consuming the message from topic which has got multiple partitions say 3. So is it fine if I configure it in application.yml file as follows. I have tried below configuration and see 3 consumers were created.

spring:
   kafka:
     consumer:
        group-id: group-id
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
     producer:
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer
     listener:
        concurrency: 3 

So my question is, do I have to create a bean to configure ConcurrentMessageListenerContainer or the above listener configuration would internally would use ConcurrentMessageListenerContainer instead of KafkaMessageListenerContainer when it sees listener.concurrency=3?

Upvotes: 8

Views: 15134

Answers (1)

Rohit Yadav
Rohit Yadav

Reputation: 2568

spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.

Concurrency is the property of ConcurrentMessageListenerContainer. If we are defining this property in application.yml means we want to use ConcurrentMessageListenerContainer.

enter image description here

In Kafka, only one consumer is allowed to read data from one partition at a time. Using KafkaMessageListenerContainer will provide only single threaded message listener consumer. KafkaMessageListenerContainer

Using ConcurrentMessageListenerContainer will provide the number of KafkaMessageListenerContainer defined in the Concurrency property in application.yml.

We can get the same behavior using following Listener using Java code.

@Configuration
@EnableKafka
public class KafkaListenerConfiguration {

    @Autowired
    private Environment environment;

    @Bean
    public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.setBatchListener(true);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("spring.kafka.bootstrap-servers"));
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "1111");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
}

I would suggest you to go through the below link to have a practical example of the same.

https://howtoprogram.xyz/2016/09/25/spring-kafka-multi-threaded-message-consumption/

Upvotes: 8

Related Questions