BlackBishop
BlackBishop

Reputation: 737

Kafka streams - set a different time window depending on the message group

I wonder if given a KStream, is possible to set a different time window depending on the message group, for example, for groupBy "A" 5 seconds, for groupBy "B" 10 seconds ...

KStream<String, Msg> stream = builder.stream(stringSerde, msgSerde, input);
stream.groupBy((key, msg) -> msg.getPool())
      .aggregate(init, agg, TimeWindows.of(wndLength).advanceBy(wndLength), msgSerde)
      ...

Upvotes: 1

Views: 431

Answers (1)

Dmitry Minkovsky
Dmitry Minkovsky

Reputation: 38113

The simplest way that comes to mind is to .filter() or .branch() before you .groupBy()/.aggregate(), like:

KStream<String, Msg> stream = builder.stream(stringSerde, msgSerde, input);
stream.filter((key, msg) -> msg.getPool().equals("A"))
      .groupBy((key, msg) -> msg.getPool())
      .aggregate(init, agg, TimeWindows.of(wndLength).advanceBy(wndLength), msgSerde)
      ...

Upvotes: 4

Related Questions