Reputation: 27
I'm struggling to configure Spring Cloud Stream correctly for Kafka-Streams to use SSL with a trust-store and key-store.
In my application i have multiple Streams running, the SSL configuration should be the same for all of them.
The application looks like this:
Stream1: Topic1 > Topic2
Stream2: Topic2 > Topic4 Topic3
Stream3: Topic4 > Topic5
I use the latest Spring-Cloud Stream Framework with Kafka-Streams, with Avro Models. I can configure the schema-registry.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
My application.yaml file looks like this:
spring.application.name: processingapp
spring.cloud:
function.definition: stream1;stream2;stream3
stream:
bindings:
stream1-in-0:
destination: topic1
stream1-out-0:
destination: topic2
stream2-in-0:
destination: topic2
stream2-in-1:
destination: topic3
stream2-out-0:
destination: topic4
stream3-in-0:
destination: topic4
stream3-out-0:
destination: topic5
kafka:
binder:
brokers: kafkabrokerurl.com:9092
configuration: # not recognized at all
security.protocol: SSL
ssl.truststore.location: /mnt/truststore.jks
ssl.truststore.type: JKS
ssl.keystore.location: /mnt/keystore.jks
ssl.keystore.type: JKS
ssl.enabled.protocols: TLSv1.2
bindings:
default:
consumer:
resetOffsets: false
startOffset: latest
stream1-in-0:
consumer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream1-out-0:
producer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream2-in-0:
consumer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream2-in-1:
consumer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
materializedAs: sbinfo-outage-mapping-store
stream2-out-0:
producer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream3-in-0:
consumer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
stream3-out-0:
producer:
keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
streams:
binder:
configuration:
schema.registry.url: https://schemaregistryurl.com # this works
When i start the application with debug-log enabled it shows that it doesn't load the configuration that i set except for the schema-registry.
o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
bootstrap.servers = [kafkabrokerurl.com:9092]
client.dns.lookup = use_all_dns_ips
client.id =
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
So the broker will be loaded correctly but for example truststore.location just remains null.
I tried many different approaches i found here and in some other places.
I found an old issue here and tried this approach but outcome is the same: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/129
configuration: # not recognized at all
"[security.protocol]": SSL
"[ssl.truststore.location]": /mnt/truststore.jks
"[ssl.truststore.type]": JKS
"[ssl.keystore.location]": /mnt/keystore.jks
"[ssl.keystore.type]": JKS
"[ssl.enabled.protocols]": TLSv1.2
I read about that configuration is not working when using multiple binders so i also tried the approach with defining a binder name, but it complains that it doesn't recognize it.
spring.application.name: processingapp
spring.cloud:
function.definition: stream1;stream2;stream3
stream:
bindings:
stream1-in-0:
destination: topic1
binder: ssl
stream1-out-0:
destination: topic2
binder: ssl
stream2-in-0:
destination: topic2
binder: ssl
stream2-in-1:
destination: topic3
binder: ssl
stream2-out-0:
destination: topic4
binder: ssl
stream3-in-0:
destination: topic4
binder: ssl
stream3-out-0:
destination: topic5
binder: ssl
binders:
ssl:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers:
configuration:
security.protocol: SSL
ssl.truststore.location: /mnt/secrets/truststore.jks
ssl.truststore.type: JKS
ssl.keystore.location: /mnt/secrets/keystore.jks
ssl.keystore.type: JKS
ssl.enabled.protocols: TLSv1.2
Error:
2021-07-15 17:11:14.634 ERROR 5216 --- [ main] o.s.boot.SpringApplication : Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalStateException: Unknown binder configuration: kstream
I have a @Configuration annotated class where i have my 3 streams declared as Function, BiFunction and again Function.
I hope someone can help me - Thank you.
Upvotes: 1
Views: 1525
Reputation: 174554
You are missing the streams
element in the property name - you are configuring the Kafka MessageChannel Binder instead.
spring:
cloud:
stream:
kafka:
streams:
binder:
configuration:
security:
protocol: SSL
Upvotes: 2