Roman Oxman
Roman Oxman

Reputation: 397

Kafka Stream with Java: Send TO multiple topics

I am trying to output my messages using Kafka Streams using Java to multiple topics with something like:

myStream.to(
    (key, message, recordContext) -> {
         return getMessageTargetTopics(message);
    },
    Produced.with(Serdes.String(), jsonSerde)
);

The issue is that getMessageTargetTopics returns a list of topics and the method to doesn't accept lists...

I saw there is already an issue targeting this, but I was wondering if there was any possible solution in the meanwhile. Any ideas?

Thanks a lot!

Upvotes: 3

Views: 3861

Answers (2)

Schmitzi
Schmitzi

Reputation: 147

As the simple solution does not seem to work: You should have a list of all topics that might be returned by the getMessageTargetTopics.

Solution 1 (Streams DSL, but not so "clean"): Use branches to implement that logic and select the topic(s).

Solution 2 (IHMO much cleaner): Add all the topics as sinks and build a custom processor that forwards to the corresponding sink node(s) - something like context.forward(key, value, To.child("topic")) looping over the Topic-List should do it.

Upvotes: 3

Bartosz Wardziński
Bartosz Wardziński

Reputation: 6613

You can use KStream:flatMapValues(...) to calculate topics for each value. Topic name should be wrapped with value and later unwrapped.

Notice: There is drawback of such approach - your message's value will contain topic name, so application, that uses those messages has extract business value

Sample code will look like follow:

myStream.flatMapValues(value -> getMessageTargetTopicsWrappedWithValue(value))
    .to((key, value, recordContext) -> value.getTopicName, 
        Produced.with(Serdes.String(), jsonSerde));

Upvotes: 0

Related Questions