Reputation: 1035
I'm new to Stackoverflow, so forgive me if the question is badly asked. Any help/inspiration is much appreciated!
I'm using Kafka streams to filter incoming data to my database. The incoming messages looks like {"ID":"X","time":"HH:MM"}
and a few other parameters, irrelevant in this case. I managed to get a java application running that reads from a topic and prints out the incoming messages. Now what I want to do is to use KTables(?) to group incoming messages with the same ID and then use a session window to group the table in time-slots. I want a time window of X minutes continuously running on the time axis.
The first thing is of course to get a KTable running to count incoming messages with the same ID. What I would like to do should result in something like this:
ID Count
X 1
Y 3
Z 1
that keeps getting updated continuously, so messages with an outdated timestamp is removed from the table.
I'm not a hundred percent sure, but I think what I want is KTables and not KStreams, am I right? And how do I achieve the Sliding Window if this is the proper way of achieving my desired results?
This is the code I use right now. It only reads from a topic and prints the incoming messages.
private static List<String> printEvent(String o) {
System.out.println(o);
return Arrays.asList(o);
}
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream(srcTopic)
.flatMapValues(value -> printEvent(value));
I would like to know what I have to add to achieve my desired output stated above, and where I put it in my code.
Thanks in advance for the help!
Upvotes: 3
Views: 4545
Reputation: 1129
Yes you need Ktable and sliding window, i also recommend you look on watermark feature, to handle late delivery message. Example
KTable<Windowed<Key>, Value> oneMinuteWindowed = yourKStream
.groupByKey()
.reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), "store1m");
//where your adder can be as simple as (val, agg) -> agg + val
//for primitive types or as complex as you need
Upvotes: 2