Reputation: 25
While reading up on the suppress() documentation, I saw that the time window will not advance unless records are being published to the topic, because it's based on event time. Right now, my code is outputting the final value for each key, because traffic on the topic is constant, but there are downtimes when that system is brought down, causing existing records in the state store to be "frozen". I was wondering what the difference is between just having reduce(), instead of reduce().suppress(). Does reduce() act like suppress() in that they are both event time driven? My understanding is that both are doing the same thing, aggregating the keys within a certain time window.
My topology is the following:
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
final Serde<EligibilityKey> keySpecificAvroSerde = new SpecificAvroSerde<EligibilityKey>();
keySpecificAvroSerde.configure(serdeConfig, true);
final Serde<Eligibility> valueSpecificAvroSerde = new SpecificAvroSerde<Eligibility>();
valueSpecificAvroSerde.configure(serdeConfig, false);
// KStream<EligibilityKey, Eligibility>
KStream<EligibilityKey, Eligibility> kStreamInput = builder.stream(input,
Consumed.with(keySpecificAvroSerde, valueSpecificAvroSerde));
// KStream<EligibilityKey, String>
KStream<EligibilityKey, String> kStreamMapValues = kStreamInput
.mapValues((key, value) -> Processor.process(key, value));
// WindowBytesStoreSupplier
WindowBytesStoreSupplier windowBytesStoreSupplier = Stores.inMemoryWindowStore("in-mem",
Duration.ofSeconds(retentionPeriod), Duration.ofSeconds(windowSize), false);
// Materialized
Materialized<EligibilityKey, String, WindowStore<Bytes, byte[]>> materialized = Materialized
.as(windowBytesStoreSupplier);
materialized = Materialized.with(keySpecificAvroSerde, Serdes.String());
// TimeWindows
TimeWindows timeWindows = TimeWindows.of(Duration.ofSeconds(size)).advanceBy(Duration.ofSeconds(advance))
.grace(Duration.ofSeconds(afterWindowEnd));
// KTable<Windowed<EligibilityKey>, String>
KTable<Windowed<EligibilityKey>, String> kTable = kStreamMapValues
.groupByKey(Grouped.with(keySpecificAvroSerde, Serdes.String())).windowedBy(timeWindows)
.reduce((a, b) -> b, materialized.withLoggingDisabled().withRetention(Duration.ofSeconds(retention)))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded().withLoggingDisabled()));
// KStream<Windowed<EligibilityKey>, String>
KStream<Windowed<EligibilityKey>, String> kStreamOutput = kTable.toStream();
Upvotes: 1
Views: 2246
Reputation: 1418
By using reduce()
without suppress, the result of the aggregation is updated continuously, i.e., updates to the KTable that holds the results of the reduce()
are sent downstream also before all records of a window are processed.
Assume a reduce that just sums up the values in a window of duration 3 with grace 0 and the following input records (key, value, timestamp) to reduce()
:
With reduce().suppress()
, the result are buffered until the window closes. The result would be:
Note that for the case without suppress()
I assumed that the cache is switched off with cache.max.bytes.buffering = 0
. With cache.max.bytes.buffering > 0
(default is 10MB), the cache will buffer output records of a KTable and once the cache is full, it will output the record with the key that was least recently updated.
Upvotes: 0