Reputation: 1
I have a Kstreams application where I am reading from an input topic, performing aggregation in a window of 15 min , suppressing and then performing some operation on each record, following is the code
message
.mapValues((key, val) -> {
return val.getId();
})
.groupByKey(Grouped.with(new Serdes.StringSerde(), new Serdes.StringSerde()))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(windowSize)))
.count(Named.as("windowed-count"))
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream(Named.as("aggregated-stream"))
.foreach((k,v) -> {
String id = k.key();
METRICS.recordItemCount("FilteredOutputUniqueIngestionMessage");
CompletableFuture.runAsync(() -> {
long ingestionStartTime = System.currentTimeMillis();
data data = dataIngestor.ingest(id).toCompletableFuture().join();
if (data != null && StringUtils.isNotBlank(data.getid())) {
String id = data.getId();
Signal signal = Signal.builder().id(id).source("store-eligible-ad-event").tenant("TENANT").language(LANGUAGE_ENGLISH).timestamp(System.currentTimeMillis()).build();
try {
String kafkaMessage = messageConvertor.toStringMessage(signal);
emitterService.send("StoreEmitter-out-0", id, kafkaMessage);
} catch (JsonProcessingException e) {
log.error("Error occurred in sending store index feed to kafka for {} with ex {}", id, e.toString());
METRICS.recordExceptionCount("IngestorKafkaFeedException_JsonProcessingException");
}
}
METRICS.recordExecutionTime("StoreIngestionTime", System.currentTimeMillis() - ingestionStartTime);
}, executorService);
});
In the above code, I am not seeing any rebalancing happening in the consumers, although my maxpoll interval is set to 5 min(default value), does this mean that kafka is commiting offsets even before reaching the foreach block?
Upvotes: 0
Views: 36
Reputation: 62285
does this mean that kafka is commiting offsets even before reaching the foreach block?
It's not an accurate way to think about it, but yes.
Note that the code you write, is a description of a dataflow program, that is executed in the background. The Kafka Streams runtime does read data from the input topic(s) and pipes the records through the dataflow program. During this "infinite execution loop", Kafka Streams commits offsets based on commit.interval.ms
config.
Upvotes: 0