CuriousDev
CuriousDev

Reputation: 311

How & when the Kafka message will be redelivered if manual acknowledgment not sent due to error processing the message at consumer ?

I am using spring cloud stream version 1.1.2 to create /integrate consumer with microservice. I am setting auto-commit-offset consumer property to False so that i can receive acknowledgment header in Message and manually acknowledge messages once it is consumed successfully.

My concern is, if something fails during message consumption i will not send acknowledgment back to broker but when i can expect the same message re-delivered to consumer. Currently, i can verify re-delivery only if i restart server, how would it work when server is already up and running.

Consumer properties are set as

kafka:
        bindings:
          input:
            consumer:
              auto-commit-offset: false
              reset-offsets: true
              start-ofofset: earliest

Upvotes: 2

Views: 1998

Answers (1)

Gerrit
Gerrit

Reputation: 435

You need to seek the offset before the message in you client. The offset is kept persistent at the kafka service for your group and at your client in memory. The latter will be lost when the service is being restarted which is why you then again consume your message.

This can be solved by:

public class KafkaConsumer implements ConsumerSeekAware {

and

        this.seekCallBack.get().seek(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset());

I hope this helps you!

Complete consumer code:

public class KafkaConsumer implements ConsumerSeekAware {

private static final String USER_TOPIC = "user-topic";
private static final String USER_LISTENER_ID = "userListener";
private static final String STRING_LISTENER = "string-listener";

private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();

private final KafkaListenerEndpointRegistry registry;

private final TaskScheduler scheduler;

private final LocalValidatorFactoryBean validatorFactory;


public KafkaConsumer(final KafkaListenerEndpointRegistry registry, final TaskScheduler scheduler, final LocalValidatorFactoryBean validatorFactory) {
    this.registry = registry;
    this.scheduler = scheduler;
    this.validatorFactory = validatorFactory;


}

public void registerSeekCallback(ConsumerSeekAware.ConsumerSeekCallback callback) {
    this.seekCallBack.set(callback);
}

@Override
public void onPartitionsAssigned(final Map<TopicPartition, Long> assignments, final ConsumerSeekCallback callback) {

}

@Override
public void onIdleContainer(final Map<TopicPartition, Long> assignments, final ConsumerSeekCallback callback) {

}


@KafkaListener(id = USER_LISTENER_ID, topics = USER_TOPIC, containerFactory = "userContainerFactory")
public void consumeJson(ConsumerRecord<String, User> consumerRecord, User user, final Acknowledgment acknowledgment) {

   if (user.getName().equals("reject")) {
        throw new IllegalStateException("Illegal user:" + user.getName());
    }


    if (!user.getName().equals("retry")) {
        acknowledgment.acknowledge();
        log.info("Consumed JSON Message: " + user);
    } else {
        log.info("Rejected: " + user);
        this.seekCallBack.get().seek(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset());
    }


}

Upvotes: 3

Related Questions