jitin sharda
jitin sharda

Reputation: 31

Spring cloud stream: multiple binder for Kafka Producer and Consumer with separate jaas configuration doesn't work together

I am trying to implement Kafka consumer and Kafka producer within same Spring boot application using spring cloud and binder. Both run successfully if executed separately, but if executed together only Kafka Producer is able to connect successfully with kafka cluster but Kafka Consumer is failed to login into Kafka Cluster. I think the issue is with multiple/different jaas configuration for kafka producer and consumer. Please find below my application.yml file

spring:
  cloud:
    stream:
      bindings:
        input:
          group: consumer-tt1
          useNativeEncoding: true
          destination: consumer-topic
          content-type: application/json
          binder: consumer
        output:
          destination: producer-topic
          useNativeEncoding: true
          content-type: application/*+avro
          binder: producer
      binders:
        consumer:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      autoCreateTopics: false
                      autoAddPartitions: false
                      consumer-properties:
                        key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
                        value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
                      brokers: xxxxx.tt.com:9092
                      jaas:
                        loginModule: com.sun.security.auth.module.Krb5LoginModule
                        controlFlag: required
                        options:
                          useKeyTab: true
                          storeKey: true
                          keyTab: \src\main\keytab\XXXXXCON.keytab
                          principal: [email protected]
                          doNotPrompt: true
                          refreshKrb5Config: true
                      configuration:
                        application: XXXXXCON
                        sasl:
                          kerberos:
                            realm: tt.com
                            kdc: tt.com
                            service:
                              name: kafka
                          jaas:
                            loginModule: com.sun.security.auth.module.Krb5LoginModule
                            controlFlag: required
                            config:
                              useKeyTab: true
                              storeKey: true
                              keyTab: \src\main\keytab\XXXXXCON.keytab
                              principal: [email protected]
                              doNotPrompt: true
                              refreshKrb5Config: true
                        security:
                          protocol: SASL_SSL
                        ssl:
                          truststore:
                            location: \src\main\keytab\truststore.jks
                            password: 123456789
                            type: JKS
        producer:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      autoCreateTopics: false
                      autoAddPartitions: false
                      producer-properties:
                        key.serializer: org.apache.kafka.common.serialization.StringSerializer
                        value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                        schema.registry.url: http:/kafka-schema:8484
                      brokers: xxxxx.tt.com:9092
                      jaas:
                        loginModule: com.sun.security.auth.module.Krb5LoginModule
                        controlFlag: required
                        options:
                          useKeyTab: true
                          storeKey: true
                          keyTab: \src\main\keytab\XXXXXPRO.keytab
                          principal: [email protected]
                          doNotPrompt: true
                          refreshKrb5Config: true
                      configuration:
                        application:
                           id: XXXXXPRO
                        sasl:
                          kerberos:
                            realm: tt.com
                            kdc: tt.com
                            service:
                              name: kafka
                          jaas:
                            loginModule: com.sun.security.auth.module.Krb5LoginModule
                            controlFlag: required
                            config:
                              useKeyTab: true
                              storeKey: true
                              keyTab: \src\main\keytab\XXXXXPRO.keytab
                              principal: [email protected]
                              doNotPrompt: true
                              refreshKrb5Config: true
                        security:
                          protocol: SASL_SSL
                        ssl:
                          truststore:
                            location: \src\main\keytab\truststore.jks
                            password: 123456789
                            type: JKS
                  schema-registry-client:
                    endpoint: http:/kafka-schema:8484

If i run this same application.yml with @EnableBinding(Source.class) or @EnableBinding(Sink.class) at main class, it connects successfully with kafka cluster as Kafka Producer or Kafka Consumer. But when i run this same application.yml with @EnableBinding(Processor.class), i am getting below error with kafka consumer(Kafka Producer works fine and connects to Kafka cluster). Issue is only with Kafka Consumer: not authorized to access the topic.

org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [consumer-topic]
2020-03-24 19:45:07.794  WARN 19000 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : No partitions have been retrieved for the topic (consumer-topic). This will affect the health check.
2020-03-24 19:45:07.794  WARN 19000 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : The number of expected partitions was: 1, but 0 has been found instead.There will be 1 idle consumers
2020-03-24 19:45:07.796 ERROR 19000 --- [           main] o.s.cloud.stream.binding.BindingService  : Failed to create consumer binding; retrying in 30 seconds

org.springframework.cloud.stream.binder.BinderException: Exception thrown while starting consumer: 
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:435) ~[spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:97) ~[spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:142) ~[spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:144) [spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:122) [spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.cloud.stream.binding.BindableProxyFactory.createAndBindInputs(BindableProxyFactory.java:254) [spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) [spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:1.8.0_162]
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:48) ~[spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34) [spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:893) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:744) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:391) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:312) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1204) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at com.rbc.ess.ESSEventTransform.EssEventTransformApplication.main(EssEventTransformApplication.java:21) ~[classes/:na]
Caused by: java.lang.IllegalArgumentException: A list of partitions must be provided
    at org.springframework.util.Assert.isTrue(Assert.java:118) ~[spring-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:446) ~[spring-cloud-stream-binder-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:133) ~[spring-cloud-stream-binder-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:382) ~[spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    ... 24 common frames omitted

Please check and clarify how to pass multiple jaas configuration at Spring cloud stream binder application

Upvotes: 1

Views: 3797

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

Your jaas configuration is incorrect; it should be

jaas:
  config: com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="\src\main\keytab\XXXXXCON.keytab" principal="[email protected]" doNotPrompt=true refreshKrb5Config=true;

See the documentation.

I can't explain why it works at all when you only have one binder.

Also, your useNativeEncoding is incorrect; it should be

...
        input:
          consumer:
            useNativeDecoding: true
...
        output:
          producer:
            useNativeEncoding: true

Upvotes: 0

Related Questions