Reputation: 772
I'm building a streaming app via Kafka Streams 2.10 and I'm facing a conceptual issue.
The producer1 sends (Key -> Value): Session1 -> RUNNING
The producer2 sends (Key -> Value): Sessionabc -> RUNNING
The producer1 sends (Key -> Value): Session1 -> DONE
Now I want to detect a dead session. I'm trying to use a SessionWindow but because Kafka computes record by record I cannot calculate all at once.
Here is my snippet:
builder
.stream("topic", Consumed.with(serdeKeySessionEvent, serdeValueSessionEvent))
.groupByKey(Grouped.with(serdeKeySessionEvent, serdeValueSessionEvent))
.windowedBy(SessionWindows.with(SESSION_DURATION))
.reduce(new SessionReducer())
.toStream((windowed, value) -> windowed.key())
.filter((k,v)-> Objects.nonNull(v) && v.getStatus() == Status.RUNNING)
.peek((a,b)->System.out.println("This Value is missing: \n "+a.toString()+b.toString()));`
Note: The reducer just makes sure that when we see a DONE regardless which other element we have for the same session it will be always done. Any Ideas?
Upvotes: 0
Views: 1420
Reputation: 6593
With Processor API it can be done easily with a little more of code. DSL can be mixed with Processor API.
Processing would look like this.
StreamsBuilder::addStateStore
KStream::transform
function with Transformer, that do whole workProcessorContext::forward
with status (DONE, DEAD)Whole code how to do that, you can found here
Upvotes: 2