Reputation: 326
Background :
Setting consumer interceptor to StreamsConfig will ensure that the interceptor(s) are called when messages are consumed/committed. Snippet from org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#commitOffsetsSync
if (future.succeeded()) {
if (interceptors != null)
interceptors.onCommit(offsets);
return true;
}
But the consumerInterceptor.onCommit()
was never called even though I saw the offsets being committed at the source topic.
Issue:
I figured that it was because I was using kstreams with Exactly once processing guarantee enabled.
This was the logic at org.apache.kafka.streams.processor.internals.StreamTask#commit
if (this.eosEnabled) {
this.producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, this.applicationId);
this.producer.commitTransaction();
if (startNewTransaction) {
this.producer.beginTransaction();
}
} else {
this.consumer.commitSync(consumedOffsetsAndMetadata);
}
As you can see, consumer.commitSync
which in turns calls the consumerCoordinator.commit
which calls the interceptor.onCommit
, never gets called because with eos enabled, it is the transaction api that gets invoked.
Question: Is there a way I can hook a callback to kstream when offsets are committed at the source topic with eos enabled?
Upvotes: 0
Views: 329