Reputation: 11
My requirement is to reset the offset of a kafka topic when the application has failed to process the message read from current offset of kafka topic which is consumed through a spring boot java application. After resetting the offset manually or sending a negative acknowledgement the messages needs to be polled again from the uncomitted offset value through the kafka consumer of spring boot java application. Can it be achieved?
Upvotes: 1
Views: 831
Reputation: 20860
That is the issue with Spring Kafka versions < 2.0.1 where the failed records are lost and Listener continues reading from next offset.
If you are using Spring kafka 2.0.1+, you can use SeekToCurrentErrorHandler
which will cause the failed and unprocessed records to be sent in the next poll.
To configure the listener container with the above handler, you need to add it into ContainerProperties
Here is an example of KafkaListener container factory
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(this.consumerFactory);
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
return factory;
}
Here is the detailed documentation :
https://docs.spring.io/spring-kafka/reference/html/_reference.html#_seek_to_current_container_error_handlers
Upvotes: 1