Reputation: 3216
I have an existing stream which uses two topics as its source:
val streamsBuilder = new StreamsBuilder
val stream1 = streamsBuilder.stream[K, V]("topic1")
val stream2 = streamsBuilder.stream[K, V]("topic2")
stream1
.merge(stream2)
.groupByKey
.reduce(reduceValues)
.toStream
.to("result-topic")
The auto-generated name of the StateStore
is KSTREAM-REDUCE-STATE-STORE-0000000003
.
Now I need to add one more topic as a source. However, adding a new source increments a kafka-internal number, causing the StateStore
to be KSTREAM-REDUCE-STATE-STORE-0000000005
. I don't want to lose the current state, so I explicitly provide the name of the old StateStore
:
val streamsBuilder = new StreamsBuilder
val stream1 = streamsBuilder.stream[K, V]("topic1")
val stream2 = streamsBuilder.stream[K, V]("topic2")
val stream3 = streamsBuilder.stream[K, V]("topic3") // new topic
stream1
.merge(stream2)
.merge(stream3) // merge new topic
.groupByKey
.reduce(reduceValues)(Materialized.as("KSTREAM-REDUCE-STATE-STORE-0000000003")
.toStream
.to("result-topic")
It seems to work, but I'm not sure if I'm interfering with the Kafka internals because:
StateStore
is different than what it was initially.Any comments?
Upvotes: 1
Views: 809
Reputation: 5093
To be honest, the safest option would be to add human-readable name to this state but, as you mentioned, you are going to lose it.
I assume there shouldn't be any problem with what you did (at least until you introduce another change in code :)). ID 0000000003
is going to be assigned to groupByKey
operator so there won't be any conflicts (although I am not 100% sure about Kafka Streams internals there).
There is also Application Reset Tool that allows you to regenerate aggregations. But I don't know if it is applicable to your case: your retention policy on input topics might prevent this tool to regenerate exact aggregates.
Upvotes: 3