Renato Garcia
Renato Garcia

Reputation: 1221

Spring Kafka commit reset offset not work when application goes down

As described in the documentation, an offset should only be committed when I actually commit (When AckMode.MANUAL_IMMEDIATE or AckMode.MANUAL) or at the end of the listener execution when AckMode.RECORD, however, in the middle of processing the method annotated with @KafkaListener the application goes down, the message is not re-delivered, the application starts reading from the next valid message and this current message is lost (message that was being processed when the application was restarted), how do I achieve the goal of the application reprocessing an uncommitted message when the application is restarted in the middle of processing? I have also tried configuring AUTO_OFFSET_RESET_CONFIG as earliest, latest and none without success in the 3 models. For testing purposes, I created a topic with just one partition, I forced the listener to use the container factory that I define manually. The springboot-version 2.2.6

    @Configuration
    class KafkaTestConfiguration {

        @Bean
        fun producerFactory(): ProducerFactory<String, String> {
            return DefaultKafkaProducerFactory(producerConfigs())
        }

        @Bean
        fun consumerFactory(): ConsumerFactory<Any, Any> {
            return DefaultKafkaConsumerFactory(consumerConfigs())
        }

        @Bean
        fun producerConfigs(): Map<String, Any> {
            val props: MutableMap<String, Any> = HashMap()
            props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9094"
            props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
            props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
            return props
        }

        @Bean
        fun consumerConfigs(): Map<String, Any> {
            val props: MutableMap<String, Any> = HashMap()
            props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9094"
            props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
            props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
            props[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = 20000
            props[ConsumerConfig.GROUP_ID_CONFIG] = "kafka-retry"
            props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
            props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"
            return props
        }

        @Bean
        fun kafkaTemplate(): KafkaTemplate<String, String> {
            return KafkaTemplate(producerFactory())
        }

  @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<Any, Any> {
        val factory: ConcurrentKafkaListenerContainerFactory<Any, Any> = ConcurrentKafkaListenerContainerFactory()
        factory.consumerFactory = consumerFactory()
        factory.consumerFactory.createConsumer()
        val containerProperties = factory.containerProperties
        containerProperties.isAckOnError = false
        containerProperties.ackMode = AckMode.MANUAL_IMMEDIATE
        containerProperties.commitLogLevel = LogIfLevelEnabled.Level.INFO
        containerProperties.isLogContainerConfig = true
        return factory
    }

    @Component
    class KafkaListenerAck {
        @KafkaListener(id = "listMsgAckConsumer", topics = ["kafkaListenerTest1"],
                groupId = "kafka-retry",
                concurrency = "1",
                containerFactory = "kafkaListenerContainerFactory"
        )
        fun onMessage(data: ConsumerRecord<String, String>, acknowledgment: Acknowledgment?) {
            println("listMsgAckConsumer1 - topic ${data.topic()} offset ${data.offset()} partition ${data.partition()} message ${data.value()}")
            println("If stop container here, the next pool will not deliver the current unconfirmed message")
            acknowledgment?.acknowledge()
        }
    }

Upvotes: 0

Views: 830

Answers (1)

Gary Russell
Gary Russell

Reputation: 174689

The offset will not be committed until the acknowledgment.acknowledge() is called. Set the commitLogLevel container property to DEBUG to see commit activity.

auto.offset.reset only applies if the consumer has never committed an offset (new consumer groups only).

If you can't figure it out from the log; edit the question with the log snippet.

Upvotes: 1

Related Questions