Nutritioustim
Nutritioustim

Reputation: 2738

Provide a stable name for Kafka Streams internal changelog topic

Our team is using Kafka Streams version 3.6.0. We are trying to run a Kafka Streams topology over some externally managed brokers. I’m getting the following error when the Kafka Streams application tries to create its internal changelog topic.

org.apache.kafka.streams.errors.StreamsException: Could not create topic <application.id>-KSTREAM-OUTEROTHER-0000000007-store-changelog.
    at org.apache.kafka.streams.processor.internals.InternalTopicManager.getTopicPartitionInfo(InternalTopicManager.java:623)
    at org.apache.kafka.streams.processor.internals.InternalTopicManager.getNumPartitions(InternalTopicManager.java:641)
    at org.apache.kafka.streams.processor.internals.InternalTopicManager.validateTopics(InternalTopicManager.java:657)
    at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:472)
    at org.apache.kafka.streams.processor.internals.ChangelogTopics.setup(ChangelogTopics.java:97)
    ...
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:962)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)

If we don't have a way to give our underlying Kafka client admin permissions... Can we provide a stable name for Kafka Streams internal changelog topic? Setting the topic prefix doesn't seem to quite do the trick, as we have some internal topics that are like testapplication-KSTREAM-REDUCE-STATE-STORE-0000000008-repartition.

Prior research

Upvotes: 0

Views: 230

Answers (2)

Nutritioustim
Nutritioustim

Reputation: 2738

I see the KStream.join method that @AyoubOmari pointed out. We are using outerJoin, which has the same method signature.

So we're calling StreamJoined<K,V1,V2>.with(Serde<K> keySerde, Serde<V1> valueSerde, Serde<V2> otherValueSerde).withStoreName(String storeName) (ref)... Then passing that to KStream.outerJoin.

AND indeed KafkaStreams is now trying to create a changelog topic with a prescribed name that uses the supplied storeName. So I expect this changelog name remain the same (be stable) across topology restarts.

org.apache.kafka.streams.errors.StreamsException: Could not create topic notification-verification-service-join-toplogy-STORENAME-stream-outer-this-join-store-changelog.
    at org.apache.kafka.streams.processor.internals.InternalTopicManager.getTopicPartitionInfo(InternalTopicManager.java:623)
    at org.apache.kafka.streams.processor.internals.InternalTopicManager.getNumPartitions(InternalTopicManager.java:641)
    at org.apache.kafka.streams.processor.internals.InternalTopicManager.validateTopics(InternalTopicManager.java:657)
    at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:472)
    at org.apache.kafka.streams.processor.internals.ChangelogTopics.setup(ChangelogTopics.java:97)
    ...
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:962)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)

Thanks @AyoubOmari !

UPDATE @ 11:20 am

One other thing... Can you attach a prefix to the computed storeName? So A becomes B?

  • A. notification-verification-service-join-toplogy-STORENAME-stream-outer-this-join-store-changelog
  • B. PREFIX-notification-verification-service-join-toplogy-STORENAME-stream-outer-this-join-store-changelog

UPDATE @ 11:34 am

  • StreamsConfig/topicPrefix only adds a prefix to topic config keys sent to the constructor for org.apache.kafka.streams.KafkaStreams.
  • But it looks like setting :application-id did the trick.
org.apache.kafka.streams.errors.StreamsException: Could not create topic PREFIX-notification-verification-service-join-toplogy-STORENAME-stream-outer-this-join-store-changelog.
    at org.apache.kafka.streams.processor.internals.InternalTopicManager.getTopicPartitionInfo(InternalTopicManager.java:623)
    at org.apache.kafka.streams.processor.internals.InternalTopicManager.getNumPartitions(InternalTopicManager.java:641)
    at org.apache.kafka.streams.processor.internals.InternalTopicManager.validateTopics(InternalTopicManager.java:657)
    at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:472)
    at org.apache.kafka.streams.processor.internals.ChangelogTopics.setup(ChangelogTopics.java:97)
    ...

Upvotes: 0

Nutritioustim
Nutritioustim

Reputation: 2738

Just a follow on to @AyoubOmari 's comments.

A. "You are doing a join between two KStreams right?"
Correct, yes.

"I think this can be configured in the parameter StreamJoined of KStream.join(). I believe it's the field storeName inside StreamJoined."
This is the constructor for org.apache.kafka.streams.KafkaStreams.
These are its Configuration Properties. And I don't see where storeName can be passed.

We are using org.apache.kafka.streams.kstream.KStream/outerJoin. And I don't see a way to pass in a storeName.

B. KafkaStreams - Streams Client talks about the internals of org.apache.kafka.streams.KafkaStreams.

But Stores Utility — Factory of State Stores also looks at the internals of org.apache.kafka.streams.state.Stores. Maybe there's a way to enforce a consistent name for the changelog there?

C. If we can't enforce a predetermined name for a changelog there's a potential backup plan... Since changelog names seem to be stable across topology restarts, initial deployments can include an operation where the changelog topic is first manually created. Then a Kafka streams topology is subsequently started with the expectation that the changelog name will match what was manually created. It's an ugly solution, but the only one available to us if we don't have the ability to create topics on the fly.

But this begs the central question... For the same topic and configuration inputs, will a changelog name remain the same (be stable) across topology restarts?

Upvotes: 0

Related Questions