m1771vw
m1771vw

Reputation: 785

Kafka - How to use filter and filternot at the same time?

I have a Kafka stream that takes data from a topic, and needs to filter that information to two different topics.

KStream<String, Model> stream = builder.stream(Serdes.String(), specificAvroSerde, "not-filtered-topic");
stream.filter((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "good-topic");
stream.filterNot((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "bad-topic");

However, when I do it like this, it reads the data from the topic twice -- not sure if that has any impact on performance as the data gets larger. Is there a way to just filter it once and push it to two topics?

Upvotes: 6

Views: 3692

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62350

Your approach is correct and data is not read twice from the topic and there is also no internal data-replication going on. The only disadvantage of your approach is, that both filter predicates are evaluated for each record -- however, this is quite cheap and should not be a performance issues.

However, you could still improve performance by using KStream#branch() that does take multiple predicates and evaluates all predicates after each other and returns one input stream for each predicate. If a record matches a predicate, it is put into the corresponding output stream and the evaluation stops (i.e., not further predicate is evaluated for this single record -- this ensure that each record is added to max one output stream; or is dropped if no predicate matches).

Thus, you can just provide two predicate to branch(): the first one is the same as your original filter() predicate and the second predicate always returns true.

KStream<String, Model> stream = builder.stream(
    Serdes.String(),
    specificAvroSerde,
    "not-filtered-topic"
);
KStream[] splitStreams = stream.branch(
    (key, value) -> new Processor().test(key,value),
    (key, value) -> true
);
splitStreams[0].to(Serdes.String(), specificAvroSerde, "good-topic");
splitStreams[1].to(Serdes.String(), specificAvroSerde, "bad-topic");

Not sure if this code is better readable than your original version though. I guess it's a matter of taste and I personally like your original code better, because it does express semantics better.

The version I added, should be slightly more CPU efficient, as for all records that do satisfy the predicate it is only evaluated once. And for all records that do not satisfy the result, a simple true will be return (i.e., no second predicate evaluation).

If you know that most records will end up in splitStream[1], you could also invert the predicate (and use splitStream[0] as "bad-stream") to decrease the number of calls to the second true-returning predicate. But those are only micro-optimizations and should not matter.

Upvotes: 6

Related Questions