Ashutosh Singh
Ashutosh Singh

Reputation: 1

Kafka Streams Window Size (15 mins) is greater than the max poll interval (5 min)

I have a Kstreams application where I am reading from an input topic, performing aggregation in a window of 15 min , suppressing and then performing some operation on each record, following is the code

message
                        .mapValues((key, val) -> {
                            return val.getId();
                        })
                        .groupByKey(Grouped.with(new Serdes.StringSerde(), new Serdes.StringSerde()))
                        .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(windowSize)))
                        .count(Named.as("windowed-count"))
                        .suppress(Suppressed.untilWindowCloses(unbounded()))
                        .toStream(Named.as("aggregated-stream"))
                        .foreach((k,v) -> {
                            String id = k.key();
                            METRICS.recordItemCount("FilteredOutputUniqueIngestionMessage");
                            CompletableFuture.runAsync(() -> {
                                long ingestionStartTime = System.currentTimeMillis();
                                data data = dataIngestor.ingest(id).toCompletableFuture().join();
                                if (data != null && StringUtils.isNotBlank(data.getid())) {
                                    String id = data.getId();
                                    Signal signal = Signal.builder().id(id).source("store-eligible-ad-event").tenant("TENANT").language(LANGUAGE_ENGLISH).timestamp(System.currentTimeMillis()).build();
                                    try {
                                        String kafkaMessage = messageConvertor.toStringMessage(signal);
                                        emitterService.send("StoreEmitter-out-0", id, kafkaMessage);
                                    } catch (JsonProcessingException e) {
                                        log.error("Error occurred in sending store index feed to kafka for {} with ex {}", id,  e.toString());
                                        METRICS.recordExceptionCount("IngestorKafkaFeedException_JsonProcessingException");
                                    }
                                }
                                METRICS.recordExecutionTime("StoreIngestionTime", System.currentTimeMillis() - ingestionStartTime);
                            }, executorService);
                        });

In the above code, I am not seeing any rebalancing happening in the consumers, although my maxpoll interval is set to 5 min(default value), does this mean that kafka is commiting offsets even before reaching the foreach block?

Upvotes: 0

Views: 36

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62285

does this mean that kafka is commiting offsets even before reaching the foreach block?

It's not an accurate way to think about it, but yes.

Note that the code you write, is a description of a dataflow program, that is executed in the background. The Kafka Streams runtime does read data from the input topic(s) and pipes the records through the dataflow program. During this "infinite execution loop", Kafka Streams commits offsets based on commit.interval.ms config.

Upvotes: 0

Related Questions