Reputation: 737
I wonder if given a KStream, is possible to set a different time window depending on the message group, for example, for groupBy "A" 5 seconds, for groupBy "B" 10 seconds ...
KStream<String, Msg> stream = builder.stream(stringSerde, msgSerde, input);
stream.groupBy((key, msg) -> msg.getPool())
.aggregate(init, agg, TimeWindows.of(wndLength).advanceBy(wndLength), msgSerde)
...
Upvotes: 1
Views: 431
Reputation: 38113
The simplest way that comes to mind is to .filter()
or .branch()
before you .groupBy()
/.aggregate()
, like:
KStream<String, Msg> stream = builder.stream(stringSerde, msgSerde, input);
stream.filter((key, msg) -> msg.getPool().equals("A"))
.groupBy((key, msg) -> msg.getPool())
.aggregate(init, agg, TimeWindows.of(wndLength).advanceBy(wndLength), msgSerde)
...
Upvotes: 4