Reputation: 633
I'm trying to process some events in a sliding window with kafka stream but I think i don't understand some details of kafka streams so I'm not able to do what I want.
What I have :
What I want :
To be simple: get all the events in a sliding window of 10 minutes, do a foreach on them, compute some stats/events in the context of the window, go to the next window...
What I tried : I tried to mix the Stream and the processor API like :
val streamBuilder = new StreamsBuilder()
streamBuilder.stream[Int, Person](topic)
.groupBy((_, value) => PersonWrapper(value.id, value.name))
.windowedBy(TimeWindows.of(10 * 60 * 1000L).advanceBy(1 * 60 * 1000L))
// now I have a window of (PersonWrapper, Person) right ?
streamBuilder.build().addProcessor(....)
And now I'd add a processor to this topology to process each events of the sliding window. I don't understand what is TimeWindowStream and why we should have a KGroupedStream to apply a Window on events. If someone can enlight me about Kafka stream and what I'm trying to do.
Upvotes: 1
Views: 1515
Reputation: 62285
Did you read the documentation: https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#windowing
aggregate()
for the actually processing (not need to attach a Processor
manually; the call to aggregate()
will implicitly add a Processor
for you).Btw: Kafka Streams does not really support "sliding windows" for aggregation. The window you define is called a hopping window.
KGroupedStream
and TimeWindowedKStreams
are basically just helper classes and an intermediate representation that allows for a fluent API design.
The tutorial is also a good way to get started: https://docs.confluent.io/current/streams/quickstart.html
You should also check out the examples: https://github.com/confluentinc/kafka-streams-examples
Upvotes: 1