Jorge Machado
Jorge Machado

Reputation: 772

Kafka Streams detecting missing records

I'm building a streaming app via Kafka Streams 2.10 and I'm facing a conceptual issue.

The producer1 sends (Key -> Value): Session1 -> RUNNING

The producer2 sends (Key -> Value): Sessionabc -> RUNNING

The producer1 sends (Key -> Value): Session1 -> DONE

Now I want to detect a dead session. I'm trying to use a SessionWindow but because Kafka computes record by record I cannot calculate all at once.

Here is my snippet:

builder
    .stream("topic", Consumed.with(serdeKeySessionEvent, serdeValueSessionEvent))
    .groupByKey(Grouped.with(serdeKeySessionEvent, serdeValueSessionEvent))
    .windowedBy(SessionWindows.with(SESSION_DURATION))
    .reduce(new SessionReducer())
    .toStream((windowed, value) -> windowed.key())
    .filter((k,v)-> Objects.nonNull(v) && v.getStatus() == Status.RUNNING)
    .peek((a,b)->System.out.println("This Value is missing: \n   "+a.toString()+b.toString()));`

Note: The reducer just makes sure that when we see a DONE regardless which other element we have for the same session it will be always done. Any Ideas?

Upvotes: 0

Views: 1420

Answers (1)

Bartosz Wardziński
Bartosz Wardziński

Reputation: 6593

With Processor API it can be done easily with a little more of code. DSL can be mixed with Processor API.

Processing would look like this.

  1. Build state store and add it using StreamsBuilder::addStateStore
  2. Create KStream and call KStream::transform function with Transformer, that do whole work
  3. Result of transform will be messages with information if session is DEAD or DONE
  4. Using Transformer you implement how each message should be processed. For each message you have to update keyValue Store, where key is session Id. You have to save timestamp of last message regarding the session
  5. Then in Punctuator (that is called periodically), you check which session are TIMEOUT, and pass information, using ProcessorContext::forward with status (DONE, DEAD)

Whole code how to do that, you can found here

Upvotes: 2

Related Questions