Reputation: 5
I'm using the KStream filter
functionality to write to a specific topic based on a matching predicate:
val builder: KStreamBuilder = new KStreamBuilder()
val stream: KStream[String, String] = builder.stream(config)
val filter1 = stream.filter(predicate1)
val filter2 = stream.filter(predicate2)
filter1.to("out-topic-1")
filter2.to("out-topic-2")
new KafkaStream(builder, properties).start
How can I make sure that if a message does not match any of these filters it would still be written to a default topic? I've tried defining an output for the "default" stream without any luck:
stream.to("default-topic")
Thanks for any help on the matter.
Upvotes: 0
Views: 965
Reputation: 62330
You can use KStream#branch
-- it takes multiple predicates and the last predicate could just return true
unconditionally. If you use branch
a record will be contained in exactly one output stream. Your logic from above is a little bit different atm, as a record could end up in both output streams (filter1 and filter2). If you need to preserve this branch
won't work, but you could add the following to make the second case work:
val default = stream.filterNot(predicate1).filterNot(predicate2)
default.to("default-topic");
Upvotes: 1