Reputation: 81
I am running JDK 11, Spring Boot 2.5.5 and Spring Cloud 2020.0,5 and Kafka 2.7.2.
When I have this consumer code with a Message Type of my business object:
@Bean
public Consumer<Message<MyObjectType>> process() {
return input -> {
logger.info("Message = {}", input.getPayload());
};
}
I get the error of "Failed to create topics" (see more detailed logs below), but if I changed my consumer code with String type:
public Consumer<String> consumer() {
return s -> {
logger.info("Message = {}", s);
};
}
I can see the JSON string of my business object. Why does SCS attempt to create topics in the former case and fail? Below is my properties and error logs. Would appreciate
Thanks
Properties:
spring.cloud.stream.kafka.binder.brokers=mykafka:52094
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
spring.cloud.stream.bindings.consumer-in-0.destination=my.topic
spring.cloud.stream.bindings.consumer-in-0.group=
spring.cloud.stream.bindings.consumer-in-0.content-type=application/json
Logs:
2022-01-17 13:14:11.614 ERROR 11256 --- [ main] o.s.c.s.b.k.p.KafkaTopicProvisioner : Failed to create topics
org.apache.kafka.common.errors.TopicAuthorizationException: Authorization failed.
2022-01-17 13:14:11.615 INFO 11256 --- [| adminclient-1] o.a.kafka.common.utils.AppInfoParser : App info kafka.admin.client for adminclient-1 unregistered
2022-01-17 13:14:11.622 WARN 11256 --- [.EXAMPLE.COM] o.a.k.c.security.kerberos.KerberosLogin : [[email protected]]: TGT renewal thread has been interrupted and will exit.
2022-01-17 13:14:11.622 INFO 11256 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics : Metrics scheduler closed
2022-01-17 13:14:11.623 INFO 11256 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2022-01-17 13:14:11.623 INFO 11256 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics : Metrics reporters closed
2022-01-17 13:14:11.624 ERROR 11256 --- [ main] o.s.cloud.stream.binding.BindingService : Failed to create consumer binding; retrying in 30 seconds
org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception encountered for process-in-0; nested exception is org.apache.kafka.common.errors.TopicAuthorizationException: Authorization failed.
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:347) ~[spring-cloud-stream-binder-kafka-core-3.1.6.jar:3.1.6]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.doProvisionConsumerDestination(KafkaTopicProvisioner.java:231) ~[spring-cloud-stream-binder-kafka-core-3.1.6.jar:3.1.6]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:197) ~[spring-cloud-stream-binder-kafka-core-3.1.6.jar:3.1.6]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:86) ~[spring-cloud-stream-binder-kafka-core-3.1.6.jar:3.1.6]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:421) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:92) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:143) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:180) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:137) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:118) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:na]
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.14.jar:5.3.14]
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.14.jar:5.3.14]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.14.jar:5.3.14]
at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.14.jar:5.3.14]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.14.jar:5.3.14]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.14.jar:5.3.14]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.14.jar:5.3.14]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145) ~[spring-boot-2.5.8.jar:2.5.8]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:765) ~[spring-boot-2.5.8.jar:2.5.8]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:445) ~[spring-boot-2.5.8.jar:2.5.8]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:338) ~[spring-boot-2.5.8.jar:2.5.8]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1354) ~[spring-boot-2.5.8.jar:2.5.8]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1343) ~[spring-boot-2.5.8.jar:2.5.8]
at com.example.EventsConsumerApplication.main(EventsConsumerApplication.java:10) ~[classes/:na]
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Authorization failed.
Upvotes: 1
Views: 2940
Reputation: 81
This issue is due to carelessness on my part. The method signature should be:
public Consumer<Message<MyObjectType>> consumer()
Upvotes: 2