Reputation: 397
I am trying to output my messages using Kafka Streams using Java to multiple topics with something like:
myStream.to(
(key, message, recordContext) -> {
return getMessageTargetTopics(message);
},
Produced.with(Serdes.String(), jsonSerde)
);
The issue is that getMessageTargetTopics
returns a list of topics and the method to
doesn't accept lists...
I saw there is already an issue targeting this, but I was wondering if there was any possible solution in the meanwhile. Any ideas?
Thanks a lot!
Upvotes: 3
Views: 3861
Reputation: 147
As the simple solution does not seem to work: You should have a list of all topics that might be returned by the getMessageTargetTopics.
Solution 1 (Streams DSL, but not so "clean"): Use branches to implement that logic and select the topic(s).
Solution 2 (IHMO much cleaner): Add all the topics as sinks and build a custom processor that forwards to the corresponding sink node(s) - something like context.forward(key, value, To.child("topic")) looping over the Topic-List should do it.
Upvotes: 3
Reputation: 6613
You can use KStream:flatMapValues(...)
to calculate topics for each value. Topic name should be wrapped with value and later unwrapped.
Notice: There is drawback of such approach - your message's value will contain topic name, so application, that uses those messages has extract business value
Sample code will look like follow:
myStream.flatMapValues(value -> getMessageTargetTopicsWrappedWithValue(value))
.to((key, value, recordContext) -> value.getTopicName,
Produced.with(Serdes.String(), jsonSerde));
Upvotes: 0