Reputation: 1221
As described in the documentation, an offset should only be committed when I actually commit (When AckMode.MANUAL_IMMEDIATE or AckMode.MANUAL) or at the end of the listener execution when AckMode.RECORD, however, in the middle of processing the method annotated with @KafkaListener the application goes down, the message is not re-delivered, the application starts reading from the next valid message and this current message is lost (message that was being processed when the application was restarted), how do I achieve the goal of the application reprocessing an uncommitted message when the application is restarted in the middle of processing? I have also tried configuring AUTO_OFFSET_RESET_CONFIG as earliest, latest and none without success in the 3 models. For testing purposes, I created a topic with just one partition, I forced the listener to use the container factory that I define manually. The springboot-version 2.2.6
@Configuration
class KafkaTestConfiguration {
@Bean
fun producerFactory(): ProducerFactory<String, String> {
return DefaultKafkaProducerFactory(producerConfigs())
}
@Bean
fun consumerFactory(): ConsumerFactory<Any, Any> {
return DefaultKafkaConsumerFactory(consumerConfigs())
}
@Bean
fun producerConfigs(): Map<String, Any> {
val props: MutableMap<String, Any> = HashMap()
props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9094"
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return props
}
@Bean
fun consumerConfigs(): Map<String, Any> {
val props: MutableMap<String, Any> = HashMap()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9094"
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = 20000
props[ConsumerConfig.GROUP_ID_CONFIG] = "kafka-retry"
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"
return props
}
@Bean
fun kafkaTemplate(): KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory())
}
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<Any, Any> {
val factory: ConcurrentKafkaListenerContainerFactory<Any, Any> = ConcurrentKafkaListenerContainerFactory()
factory.consumerFactory = consumerFactory()
factory.consumerFactory.createConsumer()
val containerProperties = factory.containerProperties
containerProperties.isAckOnError = false
containerProperties.ackMode = AckMode.MANUAL_IMMEDIATE
containerProperties.commitLogLevel = LogIfLevelEnabled.Level.INFO
containerProperties.isLogContainerConfig = true
return factory
}
@Component
class KafkaListenerAck {
@KafkaListener(id = "listMsgAckConsumer", topics = ["kafkaListenerTest1"],
groupId = "kafka-retry",
concurrency = "1",
containerFactory = "kafkaListenerContainerFactory"
)
fun onMessage(data: ConsumerRecord<String, String>, acknowledgment: Acknowledgment?) {
println("listMsgAckConsumer1 - topic ${data.topic()} offset ${data.offset()} partition ${data.partition()} message ${data.value()}")
println("If stop container here, the next pool will not deliver the current unconfirmed message")
acknowledgment?.acknowledge()
}
}
Upvotes: 0
Views: 830
Reputation: 174689
The offset will not be committed until the acknowledgment.acknowledge()
is called. Set the commitLogLevel
container property to DEBUG
to see commit activity.
auto.offset.reset
only applies if the consumer has never committed an offset (new consumer groups only).
If you can't figure it out from the log; edit the question with the log snippet.
Upvotes: 1