Vinodhini Chockalingam
Vinodhini Chockalingam

Reputation: 326

Is there a way to get committed offset in EOS Kafka stream?

Background :

Setting consumer interceptor to StreamsConfig will ensure that the interceptor(s) are called when messages are consumed/committed. Snippet from org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#commitOffsetsSync

       if (future.succeeded()) {
            if (interceptors != null)
                interceptors.onCommit(offsets);
            return true;
        }

But the consumerInterceptor.onCommit() was never called even though I saw the offsets being committed at the source topic.

Issue:

I figured that it was because I was using kstreams with Exactly once processing guarantee enabled.

This was the logic at org.apache.kafka.streams.processor.internals.StreamTask#commit

        if (this.eosEnabled) {
            this.producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, this.applicationId);
            this.producer.commitTransaction();
            if (startNewTransaction) {
                this.producer.beginTransaction();
            }
        } else {
            this.consumer.commitSync(consumedOffsetsAndMetadata);
        }

As you can see, consumer.commitSync which in turns calls the consumerCoordinator.commit which calls the interceptor.onCommit, never gets called because with eos enabled, it is the transaction api that gets invoked.

Question: Is there a way I can hook a callback to kstream when offsets are committed at the source topic with eos enabled?

Upvotes: 0

Views: 329

Answers (0)

Related Questions