bm1729
bm1729

Reputation: 2375

Multiple streams from a single master topic

How can I make multiple streams from a single master topic? When I do something like this:

KStreamBuilder builder = new KStreamBuilder();

builder.stream(Serdes.String(), Serdes.String(), "master")
            /* Filtering logic */
            .to(Serdes.String(), Serdes.String(), "output1");

builder.stream(Serdes.String(), Serdes.String(), "master")
            /* Filtering logic */
            .to(Serdes.String(), Serdes.String(), "output2");

KafkaStreams streams = new KafkaStreams(builder, /* config */);
streams.start();

I get the following error:

org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic master has already been registered by another source.
    at org.apache.kafka.streams.processor.TopologyBuilder.addSource(TopologyBuilder.java:347)
    at org.apache.kafka.streams.kstream.KStreamBuilder.stream(KStreamBuilder.java:92)

Do I need to make another instance of KafkaStreams for each stream from "master"?

Upvotes: 16

Views: 9075

Answers (2)

Raghavendra Acharya
Raghavendra Acharya

Reputation: 171

you can also use branch feature to achieve this

KStream<String, String> inputStream = builder.stream(Serdes.String(), Serdes.String(), "master");

and then using branch will create array of results set

final KStream<String, String>[] splitStream = inputStream.branch(new Predicate<String, String>() {
                        @Override
                        public boolean test(String key, String value) {
                            //write logic to filter 
                            return true;
                        },
                   new Predicate<String, String>() {
                        @Override
                        public boolean test(String key, String value) {
                            //write logic to filter 
                            return true;
                        },....
                    //you can write multiple predicate to filter inputStream 
                    });

finally after branching is done splitStream[0] will contain of first filter output and splitStream[1] will contain 2nd filter output and so on. To send this to any output topic you can use below code.

splitStream[0].to("out_topic1");
splitStream[1].to("out_topic2");

Upvotes: 2

Clemens Valiente
Clemens Valiente

Reputation: 867

You can create a KStream that you can reuse:

KStream<String, String> inputStream = builder.stream(Serdes.String(), Serdes.String(), "master");

then you can reuse it:

inputStream.filter(..logic1)
        .to(Serdes.String(), Serdes.String(), "output1");
inputStream.filter(..logic2)
        .to(Serdes.String(), Serdes.String(), "output2");

KafkaStreams streams = new KafkaStreams(builder, /* config */);
streams.start();

Upvotes: 22

Related Questions