Reputation: 31
I am trying to implement Kafka consumer and Kafka producer within same Spring boot application using spring cloud and binder. Both run successfully if executed separately, but if executed together only Kafka Producer is able to connect successfully with kafka cluster but Kafka Consumer is failed to login into Kafka Cluster. I think the issue is with multiple/different jaas configuration for kafka producer and consumer. Please find below my application.yml file
spring:
cloud:
stream:
bindings:
input:
group: consumer-tt1
useNativeEncoding: true
destination: consumer-topic
content-type: application/json
binder: consumer
output:
destination: producer-topic
useNativeEncoding: true
content-type: application/*+avro
binder: producer
binders:
consumer:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
autoCreateTopics: false
autoAddPartitions: false
consumer-properties:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
brokers: xxxxx.tt.com:9092
jaas:
loginModule: com.sun.security.auth.module.Krb5LoginModule
controlFlag: required
options:
useKeyTab: true
storeKey: true
keyTab: \src\main\keytab\XXXXXCON.keytab
principal: [email protected]
doNotPrompt: true
refreshKrb5Config: true
configuration:
application: XXXXXCON
sasl:
kerberos:
realm: tt.com
kdc: tt.com
service:
name: kafka
jaas:
loginModule: com.sun.security.auth.module.Krb5LoginModule
controlFlag: required
config:
useKeyTab: true
storeKey: true
keyTab: \src\main\keytab\XXXXXCON.keytab
principal: [email protected]
doNotPrompt: true
refreshKrb5Config: true
security:
protocol: SASL_SSL
ssl:
truststore:
location: \src\main\keytab\truststore.jks
password: 123456789
type: JKS
producer:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
autoCreateTopics: false
autoAddPartitions: false
producer-properties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: http:/kafka-schema:8484
brokers: xxxxx.tt.com:9092
jaas:
loginModule: com.sun.security.auth.module.Krb5LoginModule
controlFlag: required
options:
useKeyTab: true
storeKey: true
keyTab: \src\main\keytab\XXXXXPRO.keytab
principal: [email protected]
doNotPrompt: true
refreshKrb5Config: true
configuration:
application:
id: XXXXXPRO
sasl:
kerberos:
realm: tt.com
kdc: tt.com
service:
name: kafka
jaas:
loginModule: com.sun.security.auth.module.Krb5LoginModule
controlFlag: required
config:
useKeyTab: true
storeKey: true
keyTab: \src\main\keytab\XXXXXPRO.keytab
principal: [email protected]
doNotPrompt: true
refreshKrb5Config: true
security:
protocol: SASL_SSL
ssl:
truststore:
location: \src\main\keytab\truststore.jks
password: 123456789
type: JKS
schema-registry-client:
endpoint: http:/kafka-schema:8484
If i run this same application.yml with @EnableBinding(Source.class) or @EnableBinding(Sink.class) at main class, it connects successfully with kafka cluster as Kafka Producer or Kafka Consumer. But when i run this same application.yml with @EnableBinding(Processor.class), i am getting below error with kafka consumer(Kafka Producer works fine and connects to Kafka cluster). Issue is only with Kafka Consumer: not authorized to access the topic.
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [consumer-topic]
2020-03-24 19:45:07.794 WARN 19000 --- [ main] o.s.c.s.b.k.p.KafkaTopicProvisioner : No partitions have been retrieved for the topic (consumer-topic). This will affect the health check.
2020-03-24 19:45:07.794 WARN 19000 --- [ main] o.s.c.s.b.k.p.KafkaTopicProvisioner : The number of expected partitions was: 1, but 0 has been found instead.There will be 1 idle consumers
2020-03-24 19:45:07.796 ERROR 19000 --- [ main] o.s.cloud.stream.binding.BindingService : Failed to create consumer binding; retrying in 30 seconds
org.springframework.cloud.stream.binder.BinderException: Exception thrown while starting consumer:
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:435) ~[spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:97) ~[spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:142) ~[spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:144) [spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:122) [spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.cloud.stream.binding.BindableProxyFactory.createAndBindInputs(BindableProxyFactory.java:254) [spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) [spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:1.8.0_162]
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:48) ~[spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34) [spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:893) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:744) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:391) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:312) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1204) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at com.rbc.ess.ESSEventTransform.EssEventTransformApplication.main(EssEventTransformApplication.java:21) ~[classes/:na]
Caused by: java.lang.IllegalArgumentException: A list of partitions must be provided
at org.springframework.util.Assert.isTrue(Assert.java:118) ~[spring-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:446) ~[spring-cloud-stream-binder-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:133) ~[spring-cloud-stream-binder-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:382) ~[spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
... 24 common frames omitted
Please check and clarify how to pass multiple jaas configuration at Spring cloud stream binder application
Upvotes: 1
Views: 3797
Reputation: 174554
Your jaas configuration is incorrect; it should be
jaas:
config: com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="\src\main\keytab\XXXXXCON.keytab" principal="[email protected]" doNotPrompt=true refreshKrb5Config=true;
See the documentation.
I can't explain why it works at all when you only have one binder.
Also, your useNativeEncoding
is incorrect; it should be
...
input:
consumer:
useNativeDecoding: true
...
output:
producer:
useNativeEncoding: true
Upvotes: 0