bkumon
bkumon

Reputation: 31

Two Consumer group and Two Topics under single spring boot kafka

I'm new to spring boot kafka and trying to connect my spring-boot kafka application (ms-consumer-app) with two different topics associated with two different consumer group id, both need to be under the ms-consumer-app (Spring boot kafka).

Code Summary

  1. I have two consumerFactory bean method - consumerFactoryG1 with myGroupId-1 and consumerFactoryG2 with myGroupId-2
  2. Two method annotated with @KafkaListener for its corresponding topics ,groupId and containerFactory
  3. Association: container Factory= "consumerFactoryG1", topic = "test-G1", groupId = "myGroupId-1" and container Factory= "consumerFactoryG2", topic = "test-G2", groupId = "myGroupId-2"

However, when i start the ms-consumer-app (Spring boot kafka), i get "org.apache.kafka.common.errors.TopicAuthorizationException"

ERROR [ms-consumer-app] --- [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] org.apache.kafka.clients.Metadata.checkUnauthorizedTopics - [Consumer clientId=consumer-myGroupId-1-3, groupId=myGroupId-1] Topic authorization failed for topics [test-G1]
ERROR [ms-consumer-app] --- [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.error - Authorization Exception and no authorizationExceptionRetryInterval set
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test-G1]

My KafkaConsumerConfig class

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactoryG1() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupId-1");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new StringDeserializer());
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryG1() {
        ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactoryG1());
        concurrentKafkaListenerContainerFactory.setMissingTopicsFatal(false);
        return concurrentKafkaListenerContainerFactory;
    }
    
    @Bean
    public ConsumerFactory<String, String> consumerFactoryG2() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupId-2");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new StringDeserializer());
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryG2() {
        ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactoryG2());
        concurrentKafkaListenerContainerFactory.setMissingTopicsFatal(false);
        return concurrentKafkaListenerContainerFactory;
    }
    
     @KafkaListener(topics = "test-G1", groupId = "myGroupId-1", containerFactory = "kafkaListenerContainerFactoryG1")
        public void getTopicsG1(@RequestBody String emp) {
        System.out.println("Kafka event consumed is: " + emp);
    }
         @KafkaListener(topics = "test-G2", groupId = "myGroupId-2", containerFactory = "kafkaListenerContainerFactoryG2")
        public void getTopicsG2(@RequestBody String emp) {
        System.out.println("Kafka event consumed is: " + emp);
    }
}

Any leads would be of more help.

Thanks in advance

Upvotes: 0

Views: 1251

Answers (1)

Gary Russell
Gary Russell

Reputation: 174689

Since your configurations are almost the same, you don't need two factories; you can use Spring Boot's auto-configured factory (via application.properties or .yml). You have already specified the groupId on the @KafkaListeners and this overrides the factory group.id.

That is not relevant to your error though; your broker must have security configured and you are not allowed to consume from those topics.

See the Kafka documentation for information about authorization: https://kafka.apache.org/documentation/#security_authz

Upvotes: 0

Related Questions