Bruno Vilhena
Bruno Vilhena

Reputation: 63

Kafka streams internal repartition topics not being created

I'm running a kafka streams application which create 2 internal repartition topics for windowed aggregations that occur on different branches of the topology.

When I run the kafka streams application locally or on a docker-compose environment everything seems to work fine and the AdminClient creates the internal topics, when the consumers attempt to read from them.

However when deploying this application to a nomad orchestrated environment where the kafka brokers use SSL, despite the SSL certificates being supplied in the global streams configuration, the application doesn't create the internal topics.

I can see from the logs that the AdminClient seems to be correctly configured:

    bootstrap.servers = [kafka-01.aws:9093, kafka-02.aws:9093, kafka-03.aws:9093]
    client.dns.lookup = use_all_dns_ips
    client.id = namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-admin
    connections.max.idle.ms = 300000
    default.api.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.connect.timeout.ms = null
    sasl.login.read.timeout.ms = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.login.retry.backoff.max.ms = 10000
    sasl.login.retry.backoff.ms = 100
    sasl.mechanism = GSSAPI
    sasl.oauthbearer.clock.skew.seconds = 30
    sasl.oauthbearer.expected.audience = null
    sasl.oauthbearer.expected.issuer = null
    sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
    sasl.oauthbearer.jwks.endpoint.url = null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url = null
    security.protocol = SSL
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = 
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = /tmp/kafka_keystore.jks
    ssl.keystore.password = [hidden]
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = /tmp/kafka_truststore.jks
    ssl.truststore.password = [hidden]
    ssl.truststore.type = JKS

Basically the application ends up being killed and I see the following logs.

I don't see the admin client being called in the logs to create the internal topics at all:

[namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 8 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
[namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 9 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
[namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 10 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}
[namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=namespace.application-cd79b0a0-ffe2-4694-a70e-d023d7d12908-StreamThread-1-consumer, groupId=namespace.application] Error while fetching metadata with correlation id 11 : {namespace.application-GroupedResults_2_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION, namespace.application-.GroupedResults_1_Store-repartition=UNKNOWN_TOPIC_OR_PARTITION}

Any idea on what might be wrong here? I have another AdminClient bean (it's a springboot application) that I create in the application configuration to help create the output topic and a dead letter topic. It seems to succeed in creating those topics and has similar config to the internal kafka streams AdminClient, so not sure why the internal topics aren't created, any help is greatly appreciated.

Upvotes: 1

Views: 586

Answers (1)

Bruno Vilhena
Bruno Vilhena

Reputation: 63

Found the issue, unfortunately I was using springboot and spring-kafka as dependencies in this application as well. So according to the documentation (https://docs.spring.io/spring-kafka/reference/html/#configuring-topics), spring boot starts a default instance of the KafkaAdmin (unless you set the autoCreate property to false - it's true by default).

As I was not injecting the SSL properties into spring properties this default AdminClient was not configured for SSL and was superseding the one created by Kafka Streams (which was correctly configured for SSL).

Once I set the autoCreate property to false, the default spring AdminClient is no longer created and therefore I don't have to inject additional SSL properties and everything works fine as the kafka-streams created admin client is being used.

Upvotes: 2

Related Questions