Rahul Goyal
Rahul Goyal

Reputation: 11

How can we manually reset the offset of a kafka topic which is consumed through a spring boot java application?

My requirement is to reset the offset of a kafka topic when the application has failed to process the message read from current offset of kafka topic which is consumed through a spring boot java application. After resetting the offset manually or sending a negative acknowledgement the messages needs to be polled again from the uncomitted offset value through the kafka consumer of spring boot java application. Can it be achieved?

Upvotes: 1

Views: 831

Answers (1)

Nishu Tayal
Nishu Tayal

Reputation: 20860

That is the issue with Spring Kafka versions < 2.0.1 where the failed records are lost and Listener continues reading from next offset.

If you are using Spring kafka 2.0.1+, you can use SeekToCurrentErrorHandler which will cause the failed and unprocessed records to be sent in the next poll.

To configure the listener container with the above handler, you need to add it into ContainerProperties

Here is an example of KafkaListener container factory

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(this.consumerFactory);
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    return factory;
}

Here is the detailed documentation :
https://docs.spring.io/spring-kafka/reference/html/_reference.html#_seek_to_current_container_error_handlers

Upvotes: 1

Related Questions