Reputation: 558
I am new to Kafka streams and have come across a use case where there is a need to detect anomalies in an event stream.
Here is the high level use case:
There is a stream of incoming events in a kafka topic:
=====> E5,E4,E3,E2,E1,E0 =====>
Each Event
is represented by a domain class called MyEvent
.
public class MyEvent{
private String eventKey; // which is same as the kafka message key
...........
........... //business fields
private LocalDateTime timestamp;
private long runningTotal; // Important Field
}
And there is a steady stream of events that are being published to the input topic topic-in
.
The runningTotal
is increasing with time, and this value is published by an external system.
Given this background, I have to detect anomalies in the event stream:
The use case is to detect any sudden spike in the runningTotal
field with the same eventKey
.
And this spike is within a duration
, lets say 60 seconds(configurable) and treshold
(configurable, example: 500)
Example:
t0
: eventKey=key1, runningTotal = 10
t60(seconds)
: eventKey=key1, runningTotal = 50 // ok.. because there is no sudden spike
t120(seconds)
: eventKey=key1, runningTotal = 150000 // not ok.. because there is a sudden spike
I think that this is a Kafka Streams use case, with SlidingWindows
, .filter()
and .aggregate()
.
But I am not able to wrap my head around on how to implement this in Java/Spring Boot. Any help would be appreciated.
KStream<String, MyEvent> streamIn = streamsBuilder.stream(TOPIC_IN, Consumed.with(Serdes.String(), new JsonSerde<>(MyEvent.class)));
streamIn.
//.....do some magic here.....
.to("topic-out")
The output should be published to topic-out
.
Upvotes: 0
Views: 34