mouli
mouli

Reputation: 11

Kafka streams - Trying to filter data from the streams and write to another topic. But returns empty

countInput is the KStream here.

countInput.print(Printed.toSysOut());

I get output as below

null, {def=;abc=123;ghi=}

I try to get count of "abc= " alone.

KTable<String, Long> Counts = countInput.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(";"))).groupBy((key,value) -> value).count();

Counts.filter((key,value) -> key.startsWith("ABC="));  

But above code doesnt seem to work. Any inputs. Thanks.

Upvotes: 0

Views: 705

Answers (1)

Nishu Tayal
Nishu Tayal

Reputation: 20840

You are converting all the value to LowerCase in flatMapValues operation while filter operation has comparison with UpperCase.

Counts.filter((key,value) -> key.startsWith("ABC="));  

Hence it won't match any key in the stream and return empty. Replace "ABC=" to "abc="

Counts.filter((key,value) -> key.startsWith("abc="));  

Upvotes: 1

Related Questions