Reputation: 2375
How can I make multiple streams from a single master topic? When I do something like this:
KStreamBuilder builder = new KStreamBuilder();
builder.stream(Serdes.String(), Serdes.String(), "master")
/* Filtering logic */
.to(Serdes.String(), Serdes.String(), "output1");
builder.stream(Serdes.String(), Serdes.String(), "master")
/* Filtering logic */
.to(Serdes.String(), Serdes.String(), "output2");
KafkaStreams streams = new KafkaStreams(builder, /* config */);
streams.start();
I get the following error:
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic master has already been registered by another source.
at org.apache.kafka.streams.processor.TopologyBuilder.addSource(TopologyBuilder.java:347)
at org.apache.kafka.streams.kstream.KStreamBuilder.stream(KStreamBuilder.java:92)
Do I need to make another instance of KafkaStreams for each stream from "master"?
Upvotes: 16
Views: 9075
Reputation: 171
you can also use branch feature to achieve this
KStream<String, String> inputStream = builder.stream(Serdes.String(), Serdes.String(), "master");
and then using branch will create array of results set
final KStream<String, String>[] splitStream = inputStream.branch(new Predicate<String, String>() {
@Override
public boolean test(String key, String value) {
//write logic to filter
return true;
},
new Predicate<String, String>() {
@Override
public boolean test(String key, String value) {
//write logic to filter
return true;
},....
//you can write multiple predicate to filter inputStream
});
finally after branching is done splitStream[0] will contain of first filter output and splitStream[1] will contain 2nd filter output and so on. To send this to any output topic you can use below code.
splitStream[0].to("out_topic1");
splitStream[1].to("out_topic2");
Upvotes: 2
Reputation: 867
You can create a KStream that you can reuse:
KStream<String, String> inputStream = builder.stream(Serdes.String(), Serdes.String(), "master");
then you can reuse it:
inputStream.filter(..logic1)
.to(Serdes.String(), Serdes.String(), "output1");
inputStream.filter(..logic2)
.to(Serdes.String(), Serdes.String(), "output2");
KafkaStreams streams = new KafkaStreams(builder, /* config */);
streams.start();
Upvotes: 22