Reputation: 2738
Our team is using Kafka Streams version 3.6.0
. We are trying to run a Kafka Streams topology over some externally managed brokers. I’m getting the following error when the Kafka Streams application tries to create its internal changelog topic.
org.apache.kafka.streams.errors.StreamsException: Could not create topic <application.id>-KSTREAM-OUTEROTHER-0000000007-store-changelog.
at org.apache.kafka.streams.processor.internals.InternalTopicManager.getTopicPartitionInfo(InternalTopicManager.java:623)
at org.apache.kafka.streams.processor.internals.InternalTopicManager.getNumPartitions(InternalTopicManager.java:641)
at org.apache.kafka.streams.processor.internals.InternalTopicManager.validateTopics(InternalTopicManager.java:657)
at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:472)
at org.apache.kafka.streams.processor.internals.ChangelogTopics.setup(ChangelogTopics.java:97)
...
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:962)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
If we don't have a way to give our underlying Kafka client admin permissions...
Can we provide a stable name for Kafka Streams internal changelog topic?
Setting the topic prefix doesn't seem to quite do the trick, as we have some internal topics that are like testapplication-KSTREAM-REDUCE-STATE-STORE-0000000008-repartition
.
Prior research
Upvotes: 0
Views: 230
Reputation: 2738
I see the KStream.join
method that @AyoubOmari pointed out. We are using outerJoin, which has the same method signature.
So we're calling StreamJoined<K,V1,V2>.with(Serde<K> keySerde, Serde<V1> valueSerde, Serde<V2> otherValueSerde).withStoreName(String storeName)
(ref)... Then passing that to KStream.outerJoin
.
AND indeed KafkaStreams is now trying to create a changelog
topic with a prescribed name that uses the supplied storeName
.
So I expect this changelog
name remain the same (be stable) across topology restarts.
org.apache.kafka.streams.errors.StreamsException: Could not create topic notification-verification-service-join-toplogy-STORENAME-stream-outer-this-join-store-changelog.
at org.apache.kafka.streams.processor.internals.InternalTopicManager.getTopicPartitionInfo(InternalTopicManager.java:623)
at org.apache.kafka.streams.processor.internals.InternalTopicManager.getNumPartitions(InternalTopicManager.java:641)
at org.apache.kafka.streams.processor.internals.InternalTopicManager.validateTopics(InternalTopicManager.java:657)
at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:472)
at org.apache.kafka.streams.processor.internals.ChangelogTopics.setup(ChangelogTopics.java:97)
...
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:962)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
Thanks @AyoubOmari !
UPDATE @ 11:20 am
One other thing... Can you attach a prefix to the computed storeName
? So A becomes B?
notification-verification-service-join-toplogy-STORENAME-stream-outer-this-join-store-changelog
PREFIX-notification-verification-service-join-toplogy-STORENAME-stream-outer-this-join-store-changelog
UPDATE @ 11:34 am
StreamsConfig/topicPrefix
only adds a prefix to topic config keys sent to the constructor for org.apache.kafka.streams.KafkaStreams
.org.apache.kafka.streams.errors.StreamsException: Could not create topic PREFIX-notification-verification-service-join-toplogy-STORENAME-stream-outer-this-join-store-changelog.
at org.apache.kafka.streams.processor.internals.InternalTopicManager.getTopicPartitionInfo(InternalTopicManager.java:623)
at org.apache.kafka.streams.processor.internals.InternalTopicManager.getNumPartitions(InternalTopicManager.java:641)
at org.apache.kafka.streams.processor.internals.InternalTopicManager.validateTopics(InternalTopicManager.java:657)
at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:472)
at org.apache.kafka.streams.processor.internals.ChangelogTopics.setup(ChangelogTopics.java:97)
...
Upvotes: 0
Reputation: 2738
Just a follow on to @AyoubOmari 's comments.
A.
"You are doing a join between two KStreams right?"
Correct, yes.
"I think this can be configured in the parameter StreamJoined of KStream.join(). I believe it's the field storeName inside StreamJoined."
This is the constructor for org.apache.kafka.streams.KafkaStreams.
These are its Configuration Properties. And I don't see where storeName
can be passed.
We are using org.apache.kafka.streams.kstream.KStream/outerJoin. And I don't see a way to pass in a storeName
.
B.
KafkaStreams - Streams Client talks about the internals of org.apache.kafka.streams.KafkaStreams
.
But Stores Utility — Factory of State Stores also looks at the internals of org.apache.kafka.streams.state.Stores. Maybe there's a way to enforce a consistent name for the changelog there?
C.
If we can't enforce a predetermined name for a changelog
there's a potential backup plan... Since changelog
names seem to be stable across topology restarts, initial deployments can include an operation where the changelog
topic is first manually created. Then a Kafka streams topology is subsequently started with the expectation that the changelog
name will match what was manually created. It's an ugly solution, but the only one available to us if we don't have the ability to create topics on the fly.
But this begs the central question... For the same topic and configuration inputs, will a changelog
name remain the same (be stable) across topology restarts?
Upvotes: 0