Reputation: 665
I have a Spring Cloud Stream application using Spring Boot 2.0.3.RELEASE and Spring Cloud Stream libraries under Finchley.SR1. With properties:
spring:
kafka:
ssl:
protocol: SASL_SSL
keystore-location: classpath:kafka.p12 // created from a PEM with private key and certificate
keystore-password: passw0rd
key-store-type: PKCS12
cloud:
stream:
bindings:
...
binders:
kafka1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
autoCreateTopics: false
brokers: myhost.mydomain.com
trying to establish an SSL connection with Kafka with properties:
listeners=SASL_SSL://myhost.mydomain.com:9092
advertised.listeners=SASL_SSL://myhost.mydomain.com:9092
But I am getting following exception:
2018-08-29 17:49:13.421 ERROR [alef-data-connector,,,] 83300 --- [ main] o.s.c.s.b.k.p.KafkaTopicProvisioner : Cannot initialize Binder
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
2018-08-29 17:49:13.422 ERROR [alef-data-connector,,,] 83300 --- [ main] o.s.cloud.stream.binding.BindingService : Failed to create producer binding; retrying in 30 seconds
org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:391) ~[spring-cloud-stream-binder-kafka-core-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:212) ~[spring-cloud-stream-binder-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:126) ~[spring-cloud-stream-binder-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:153) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:77) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:138) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:244) [spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:221) [spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.cloud.stream.binding.BindableProxyFactory.bindOutputs(BindableProxyFactory.java:252) [spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:46) [spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at java.util.Iterator.forEachRemaining(Iterator.java:116) ~[na:1.8.0_162]
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) ~[na:1.8.0_162]
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580) ~[na:1.8.0_162]
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:47) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:29) [spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:52) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:157) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:121) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:885) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext.finishRefresh(ReactiveWebServerApplicationContext.java:83) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext.refresh(ReactiveWebServerApplicationContext.java:61) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:759) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:395) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:327) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1255) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1243) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
at com.alefeducation.connector.ConnectorApplicationKt.main(ConnectorApplication.kt:13) ~[classes/:na]
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
Is my configuration for my Spring Cloud Stream application correct. Do we have Spring Cloud Stream documentation explaining SSL connection with Kafka?
Upvotes: 0
Views: 4988
Reputation: 174494
When you use multi-binder support (via the environment
property), there is no inheritance of root boot properties.
You either need to move the spring.kafka.ssl
properties under the environment
or, if you only have one binder, remove the multi-binder configuration and just declare the binder properties at the root.
EDIT
This works for me...
spring:
cloud:
stream:
bindings:
input:
destination: so52080276
binder: kafka1
group: so52080276
binders:
kafka1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
autoCreateTopics: false
brokers: myhost.mydomain.com
configuration:
security.protocol: SASL_SSL
ssl.keystore.location: classpath:kafka.p12 // created from a PEM with private key and certificate
ssl.keystore.password: passw0rd
ssl.key.store.type: PKCS12
Upvotes: 2