Neyoh
Neyoh

Reputation: 633

Process item in a window with Kafka streams

I'm trying to process some events in a sliding window with kafka stream but I think i don't understand some details of kafka streams so I'm not able to do what I want.

What I have :

What I want :

To be simple: get all the events in a sliding window of 10 minutes, do a foreach on them, compute some stats/events in the context of the window, go to the next window...

What I tried : I tried to mix the Stream and the processor API like :

    val streamBuilder = new StreamsBuilder()
    streamBuilder.stream[Int, Person](topic)
      .groupBy((_, value) => PersonWrapper(value.id, value.name))
      .windowedBy(TimeWindows.of(10 * 60 * 1000L).advanceBy(1 * 60 * 1000L))
// now I have a window of (PersonWrapper, Person) right ?
    streamBuilder.build().addProcessor(....)

And now I'd add a processor to this topology to process each events of the sliding window. I don't understand what is TimeWindowStream and why we should have a KGroupedStream to apply a Window on events. If someone can enlight me about Kafka stream and what I'm trying to do.

Upvotes: 1

Views: 1515

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62285

Did you read the documentation: https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#windowing

  1. Windowing is a special form of grouping (grouping based on time)
  2. Grouping is always require to compute an aggregation in Kafka Streams
  3. After you have a grouped and windowed stream you call aggregate() for the actually processing (not need to attach a Processor manually; the call to aggregate() will implicitly add a Processor for you).

Btw: Kafka Streams does not really support "sliding windows" for aggregation. The window you define is called a hopping window.

KGroupedStream and TimeWindowedKStreams are basically just helper classes and an intermediate representation that allows for a fluent API design.

The tutorial is also a good way to get started: https://docs.confluent.io/current/streams/quickstart.html

You should also check out the examples: https://github.com/confluentinc/kafka-streams-examples

Upvotes: 1

Related Questions