Reputation: 643
If I use listener.concurrency property, would this use KafkaMessageListenerContainer or ConcurrentMessageListenerContainer? Note I have not defined any beans explicitly and I leave the required bean creation to spring boot.
I wanted to know if my understanding is correct. I would want parallelism in consuming the message from topic which has got multiple partitions say 3. So is it fine if I configure it in application.yml
file as follows. I have tried below configuration and see 3 consumers were created.
spring:
kafka:
consumer:
group-id: group-id
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
listener:
concurrency: 3
So my question is, do I have to create a bean to configure ConcurrentMessageListenerContainer
or the above listener configuration would internally would use ConcurrentMessageListenerContainer
instead of KafkaMessageListenerContainer
when it sees listener.concurrency=3
?
Upvotes: 8
Views: 15134
Reputation: 2568
spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.
Concurrency
is the property of ConcurrentMessageListenerContainer.
If we are defining this property in application.yml
means we want to use ConcurrentMessageListenerContainer.
In Kafka, only one consumer is allowed to read data from one partition at a time.
Using KafkaMessageListenerContainer
will provide only single threaded message listener consumer. KafkaMessageListenerContainer
Using ConcurrentMessageListenerContainer
will provide the number of KafkaMessageListenerContainer
defined in the Concurrency
property in application.yml
.
We can get the same behavior using following Listener using Java code.
@Configuration
@EnableKafka
public class KafkaListenerConfiguration {
@Autowired
private Environment environment;
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.setBatchListener(true);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("spring.kafka.bootstrap-servers"));
props.put(ConsumerConfig.GROUP_ID_CONFIG, "1111");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
}
I would suggest you to go through the below link to have a practical example of the same.
https://howtoprogram.xyz/2016/09/25/spring-kafka-multi-threaded-message-consumption/
Upvotes: 8