Bilal Wahla
Bilal Wahla

Reputation: 665

Spring Cloud Stream SSL connection with Kafka

I have a Spring Cloud Stream application using Spring Boot 2.0.3.RELEASE and Spring Cloud Stream libraries under Finchley.SR1. With properties:

spring:
  kafka:
    ssl:
      protocol: SASL_SSL
      keystore-location: classpath:kafka.p12 // created from a PEM with private key and certificate
      keystore-password: passw0rd
      key-store-type: PKCS12
  cloud:
    stream:
      bindings:
      ...
      binders:
        kafka1:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      autoCreateTopics: false
                      brokers: myhost.mydomain.com

trying to establish an SSL connection with Kafka with properties:

listeners=SASL_SSL://myhost.mydomain.com:9092
advertised.listeners=SASL_SSL://myhost.mydomain.com:9092

But I am getting following exception:

2018-08-29 17:49:13.421 ERROR [alef-data-connector,,,] 83300 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Cannot initialize Binder

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

2018-08-29 17:49:13.422 ERROR [alef-data-connector,,,] 83300 --- [           main] o.s.cloud.stream.binding.BindingService  : Failed to create producer binding; retrying in 30 seconds

org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:391) ~[spring-cloud-stream-binder-kafka-core-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:212) ~[spring-cloud-stream-binder-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:126) ~[spring-cloud-stream-binder-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:153) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:77) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:138) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:244) [spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:221) [spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binding.BindableProxyFactory.bindOutputs(BindableProxyFactory.java:252) [spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:46) [spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at java.util.Iterator.forEachRemaining(Iterator.java:116) ~[na:1.8.0_162]
    at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) ~[na:1.8.0_162]
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580) ~[na:1.8.0_162]
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:47) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:29) [spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:52) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:157) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:121) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:885) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext.finishRefresh(ReactiveWebServerApplicationContext.java:83) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext.refresh(ReactiveWebServerApplicationContext.java:61) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:759) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:395) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:327) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1255) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1243) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at com.alefeducation.connector.ConnectorApplicationKt.main(ConnectorApplication.kt:13) ~[classes/:na]
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

Is my configuration for my Spring Cloud Stream application correct. Do we have Spring Cloud Stream documentation explaining SSL connection with Kafka?

Upvotes: 0

Views: 4988

Answers (1)

Gary Russell
Gary Russell

Reputation: 174494

When you use multi-binder support (via the environment property), there is no inheritance of root boot properties.

You either need to move the spring.kafka.ssl properties under the environment or, if you only have one binder, remove the multi-binder configuration and just declare the binder properties at the root.

EDIT

This works for me...

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: so52080276
          binder: kafka1
          group: so52080276
      binders:
        kafka1:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      autoCreateTopics: false
                      brokers: myhost.mydomain.com
                      configuration:
                        security.protocol: SASL_SSL
                        ssl.keystore.location: classpath:kafka.p12 // created from a PEM with private key and certificate
                        ssl.keystore.password: passw0rd
                        ssl.key.store.type: PKCS12

Upvotes: 2

Related Questions