Reputation: 63
I'm running a kafka streams application which create 2 internal repartition topics for windowed aggregations that occur on different branches of the topology.
When I run the kafka streams application locally or on a docker-compose environment everything seems to work fine and the AdminClient creates the internal topics, when the consumers attempt to read from them.
However when deploying this application to a nomad orchestrated environment where the kafka brokers use SSL, despite the SSL certificates being supplied in the global streams configuration, the application doesn't create the internal topics.
I can see from the logs that the AdminClient seems to be correctly configured:
bootstrap.servers = [kafka-01.aws:9093, kafka-02.aws:9093, kafka-03.aws:9093]
client.dns.lookup = use_all_dns_ips
client.id = namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-admin
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = SSL
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm =
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = /tmp/kafka_keystore.jks
ssl.keystore.password = [hidden]
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = /tmp/kafka_truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
Basically the application ends up being killed and I see the following logs.
I don't see the admin client being called in the logs to create the internal topics at all:
[namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 8 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
[namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 9 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
[namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 10 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
[namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 11 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-.GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
Any idea on what might be wrong here? I have another AdminClient bean (it's a springboot application) that I create in the application configuration to help create the output topic and a dead letter topic. It seems to succeed in creating those topics and has similar config to the internal kafka streams AdminClient, so not sure why the internal topics aren't created, any help is greatly appreciated.
Upvotes: 1
Views: 586
Reputation: 63
Found the issue, unfortunately I was using springboot and spring-kafka as dependencies in this application as well. So according to the documentation (https://docs.spring.io/spring-kafka/reference/html/#configuring-topics), spring boot starts a default instance of the KafkaAdmin (unless you set the autoCreate property to false - it's true by default).
As I was not injecting the SSL properties into spring properties this default AdminClient was not configured for SSL and was superseding the one created by Kafka Streams (which was correctly configured for SSL).
Once I set the autoCreate property to false, the default spring AdminClient is no longer created and therefore I don't have to inject additional SSL properties and everything works fine as the kafka-streams created admin client is being used.
Upvotes: 2