sgsd
sgsd

Reputation: 81

Spring Cloud Stream Kafka - "Failed to create topics" error for consumer

I am running JDK 11, Spring Boot 2.5.5 and Spring Cloud 2020.0,5 and Kafka 2.7.2.

When I have this consumer code with a Message Type of my business object:

@Bean
    public Consumer<Message<MyObjectType>> process() {
        return input -> {
            
            logger.info("Message =   {}", input.getPayload());
            
        };
    }

I get the error of "Failed to create topics" (see more detailed logs below), but if I changed my consumer code with String type:

public Consumer<String> consumer() {    
    
        return s -> {
            
            logger.info("Message =   {}", s);
            
        };
    }

I can see the JSON string of my business object. Why does SCS attempt to create topics in the former case and fail? Below is my properties and error logs. Would appreciate

Thanks

Properties:

spring.cloud.stream.kafka.binder.brokers=mykafka:52094
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
spring.cloud.stream.bindings.consumer-in-0.destination=my.topic
spring.cloud.stream.bindings.consumer-in-0.group=
spring.cloud.stream.bindings.consumer-in-0.content-type=application/json

Logs:

2022-01-17 13:14:11.614 ERROR 11256 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Failed to create topics

org.apache.kafka.common.errors.TopicAuthorizationException: Authorization failed.

2022-01-17 13:14:11.615  INFO 11256 --- [| adminclient-1] o.a.kafka.common.utils.AppInfoParser     : App info kafka.admin.client for adminclient-1 unregistered
2022-01-17 13:14:11.622  WARN 11256 --- [.EXAMPLE.COM] o.a.k.c.security.kerberos.KerberosLogin  : [[email protected]]: TGT renewal thread has been interrupted and will exit.
2022-01-17 13:14:11.622  INFO 11256 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics  : Metrics scheduler closed
2022-01-17 13:14:11.623  INFO 11256 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics  : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2022-01-17 13:14:11.623  INFO 11256 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics  : Metrics reporters closed
2022-01-17 13:14:11.624 ERROR 11256 --- [           main] o.s.cloud.stream.binding.BindingService  : Failed to create consumer binding; retrying in 30 seconds

org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception encountered for process-in-0; nested exception is org.apache.kafka.common.errors.TopicAuthorizationException: Authorization failed.
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:347) ~[spring-cloud-stream-binder-kafka-core-3.1.6.jar:3.1.6]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.doProvisionConsumerDestination(KafkaTopicProvisioner.java:231) ~[spring-cloud-stream-binder-kafka-core-3.1.6.jar:3.1.6]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:197) ~[spring-cloud-stream-binder-kafka-core-3.1.6.jar:3.1.6]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:86) ~[spring-cloud-stream-binder-kafka-core-3.1.6.jar:3.1.6]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:421) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:92) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:143) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
    at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:180) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
    at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:137) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:118) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
    at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:na]
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34) ~[spring-cloud-stream-3.1.6.jar:3.1.6]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.14.jar:5.3.14]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.14.jar:5.3.14]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.14.jar:5.3.14]
    at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.14.jar:5.3.14]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.14.jar:5.3.14]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.14.jar:5.3.14]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.14.jar:5.3.14]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145) ~[spring-boot-2.5.8.jar:2.5.8]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:765) ~[spring-boot-2.5.8.jar:2.5.8]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:445) ~[spring-boot-2.5.8.jar:2.5.8]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:338) ~[spring-boot-2.5.8.jar:2.5.8]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1354) ~[spring-boot-2.5.8.jar:2.5.8]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1343) ~[spring-boot-2.5.8.jar:2.5.8]
    at com.example.EventsConsumerApplication.main(EventsConsumerApplication.java:10) ~[classes/:na]
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Authorization failed.

Upvotes: 1

Views: 2940

Answers (1)

sgsd
sgsd

Reputation: 81

This issue is due to carelessness on my part. The method signature should be:

public Consumer<Message<MyObjectType>> consumer()

Upvotes: 2

Related Questions