paiego
paiego

Reputation: 3787

One KStream to multiple topics using duplicate messages; broadcast

In a Kafka streaming topology, I would like to route a stream from an incoming topic using a KStream, enhance the message, and output to a destination topic. In the middle of the stream however, I'd also like to send all the messages to another topic.

KStream<Integer, GenericRecord> ksDevice = builder.stream("source_topic", Consumed.with(Serdes.Integer(), genericRecordSerde));

KStream<Integer, GenericRecord> ksEnhance = builder.stream("enhance_topic", Consumed.with(Serdes.Integer(), genericRecordSerde));

ksDevice.join(ksEnhance, (k, v) -> /* enhance message */))
.to("orthogonal_topic")
.to("destination_topic")

This obviously isn't possible because .to() terminates the stream. I also can't use kstream.branch() because this will send the message to either one topic or the other; I need to send all messages to both topics.

I saw a post where the author suggested that you can do this by using kstream.filter().to("topic") kstream.filter().to("topic") Streaming messages to multiple topics But I can't see how this is done without terminating the stream.

Upvotes: 0

Views: 909

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191681

The linked post is correct. In your case, you don't need a filter. Create an intermediate variable

KStream k = ksDevice.join(...)
k.to()
k.to()

Alternatively, use .through().to().

But as commented, you're needlessly duplicating the exact same data on the broker. Two distinct consumer groups can be used instead to read from one topic.

Upvotes: 2

Related Questions