Reputation: 31
I'm new to spring boot kafka and trying to connect my spring-boot kafka application (ms-consumer-app) with two different topics associated with two different consumer group id, both need to be under the ms-consumer-app (Spring boot kafka).
Code Summary
However, when i start the ms-consumer-app (Spring boot kafka), i get "org.apache.kafka.common.errors.TopicAuthorizationException"
ERROR [ms-consumer-app] --- [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] org.apache.kafka.clients.Metadata.checkUnauthorizedTopics - [Consumer clientId=consumer-myGroupId-1-3, groupId=myGroupId-1] Topic authorization failed for topics [test-G1]
ERROR [ms-consumer-app] --- [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.error - Authorization Exception and no authorizationExceptionRetryInterval set
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test-G1]
My KafkaConsumerConfig class
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactoryG1() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupId-1");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new StringDeserializer());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryG1() {
ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactoryG1());
concurrentKafkaListenerContainerFactory.setMissingTopicsFatal(false);
return concurrentKafkaListenerContainerFactory;
}
@Bean
public ConsumerFactory<String, String> consumerFactoryG2() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupId-2");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new StringDeserializer());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryG2() {
ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactoryG2());
concurrentKafkaListenerContainerFactory.setMissingTopicsFatal(false);
return concurrentKafkaListenerContainerFactory;
}
@KafkaListener(topics = "test-G1", groupId = "myGroupId-1", containerFactory = "kafkaListenerContainerFactoryG1")
public void getTopicsG1(@RequestBody String emp) {
System.out.println("Kafka event consumed is: " + emp);
}
@KafkaListener(topics = "test-G2", groupId = "myGroupId-2", containerFactory = "kafkaListenerContainerFactoryG2")
public void getTopicsG2(@RequestBody String emp) {
System.out.println("Kafka event consumed is: " + emp);
}
}
Any leads would be of more help.
Thanks in advance
Upvotes: 0
Views: 1251
Reputation: 174689
Since your configurations are almost the same, you don't need two factories; you can use Spring Boot's auto-configured factory (via application.properties or .yml). You have already specified the groupId
on the @KafkaListener
s and this overrides the factory group.id
.
That is not relevant to your error though; your broker must have security configured and you are not allowed to consume from those topics.
See the Kafka documentation for information about authorization: https://kafka.apache.org/documentation/#security_authz
Upvotes: 0