Reputation: 335
I'm doing a hopping window aggregation on a 4 hr window advancing every 5 mins. As the hopping windows are overlapping, I'm getting duplicate keys with different aggregated value.
TimeWindows.of(240 * 60 * 1000L).advanceBy(5 * 60* 1000L)
How do I eliminate duplicate keys with repeating data or pick only the keys that holds the latest value.
Upvotes: 4
Views: 5279
Reputation: 15077
Update May 2021: The Kafka Streams API supports "final" window results nowadays, via a suppress()
operator. See the previous docs link as well as the blog Kafka Streams’ Take on Watermarks and Triggers from March 2019 for details.
After defining your windowed computation, you can suppress the intermediate results, emitting the final count for each user when the window is closed.
KGroupedStream<UserId, Event> grouped = ...;
grouped.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded()))
.filter((windowedUserId, count) -> count < 3)
.toStream()
.foreach((windowedUserId, count) -> sendAlert(windowedUserId.window(), windowedUserId.key(), count));
Original answer (still applies when NOT using the suppress()
operator above):
If I understand you correctly, then this is expected behavior. You are not seeing "duplicate" keys, but you see continuous updates for the same key.
Think:
# Extreme case: record caches disabled (size set to 0)
alice->1, alice->2, alice->3, alice->4, ..., alice->100, ...
# With record cache enabled, you would see sth like this.
alice->23, alice->59, alice->100, ...
Take a look at the explanation at http://docs.confluent.io/current/streams/developer-guide.html#streams-developer-guide-memory-management, which describes this in more detail. If you want to see less "duplicates" per record key, you can increase the size of record caches (when using the DSL) via cache.max.bytes.buffering
aka StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG
in your application's configuration. There's also an interplay with commit.interval.ms
.
If you are wondering "why does the Kafka Streams API behave in this way in the first place", I'd recommend the blog post https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ that was published earlier this week.
Upvotes: 9
Reputation: 605
When using hopping window, there exists multiple time windows for one key meanwhile. When new logs produced, aggregation will change the status of these time windows at the same time, thus deduplicate logs were produced to downstream topics when toStream()
is applied.
To get result of the latest time window, you must supply a filter()
to filter the latest time window change log, here is an example describe how to get latest window aggregation result when using hopping window.
hopping window aggregation with latest window result
Upvotes: 0
Reputation: 91
In addition to what Michael wrote, in hopping windows there's another layer of 'duplication'. As the windows are overlapping, the value emitted from subsequent windows can potentially be identical.
For example, let's say you have a five minute window with one minute hop: {0..5},{1..6},{2..7}
and so on. A given record from an input topic may belong to different time windows.
This comes as opposed to tumbling windows, where the windows are non-overlapping, thus every record is a part of a single window. Unfortunately, tumbling windows do not fit all use cases; an example can be an aggregation, where two records with the same key fall in the edges of two subsequent windows.
When using hopping windows, there are several ways to 'deduplicate'. One way is to 'dedup' downstream. Another way is to do it in Kafka Streams, but this is relevant only for specific topologies. As explained, these results are not real duplicates, but results for successive windows. If you want only the result of the last window for a certain key, you can write something like:
windowedKtable
.toStream((windowedKey, value) -> windowedKey.key())
.groupByKey()
.reduce((value1, value2) -> value1.lastestActivity() > value2.lastestActivity() ? value1 : value2)
I wouldn't say it's a best practice, just a way to overcome the issue in very specific circumstances.
More about windowing in Kafka Streams: https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#windowing
Upvotes: 9