gdahugo
gdahugo

Reputation: 103

When will kafka retry to process the messages that have not been acknowledged?

I have a consumer which is configured with the manual ACK property :

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, MessageAvro> kafkaListenerContainerFactory() {
        final ConcurrentKafkaListenerContainerFactory<String, MessageAvro> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

And a consumer with a @KafkaListener method which did some job like :

    @KafkaListener(
            topics = "${tpd.topic-name}",
            containerFactory = "kafkaListenerContainerFactory",
            groupId = "${tpd.group-id}")
    public void messageListener(final ConsumerRecord<String, MessageAvro> msg, @Payload final MessageAvro message, final Acknowledgment ack) {
    if (someCondition) {
        // do something
        ack.acknowledge();
    } else {
       // do not acknoledge the message here in order to retry it later.
    }
}
        

In case where the condition is "false" and we move on to the "else" part, when will my consumer try to read the unacknowledged message again?

And in case it doesn't do it again, how do I tell my @KafkaListener to take into account the unacknowledged messages?

Upvotes: 2

Views: 2221

Answers (1)

Michael Heil
Michael Heil

Reputation: 18515

As soon as you commit (or "acknowledge") an offset, all previous offsets are also committed in the sense, that the ConsumerGroup will not try to read it again.

That means: If you hit the "else" condition and your job keeps running in a way that it will hit the "if" condition with the acknowledgment all offsets are committed.

The reason behind this is that a Kafkaconsumer will report back to the brokers which offset to read next. For this to achieve Kafka stores that information within an internal Kafka topic called __consumer_offsets as a key/value pair, where key: ConsumerGroup, Topic name, Partition value: next offset to read

That internal topic is a compacted topic which means it will eventually only store the latest value for the mentioned key. As a consequence Kafka will not track the "un-acknowledged" messages in between.

Workaround

What people usually do is to fork those "un-acknowledged" messages into another topic so they can be inspected and consumed together at a later point in time. That way, you will not block your actual application from consuming further messages and you can deal with the un-acknowledged messages seperately.

Upvotes: 3

Related Questions