ben seal
ben seal

Reputation: 31

Retry kafka Message Consumption On Database Failure

I am relatively new to Kafka.

We process messages from kafka and persist them to database.if there is a failure when persisting them to database, the message will not be committed.

My questions are: we are wondering how we can re-consume the uncommitted messages?

I have tried a few approaches.

  1. Restart the consumer. It works, but relying on restarting.
  2. Catch the application exception and skip the kafka commit. Store the message into RAM and then retry.

Whats the best practice approach in this kind of situation? Thanks in advance.

Upvotes: 3

Views: 2646

Answers (2)

Maksim Bazhenov
Maksim Bazhenov

Reputation: 1

Catch an exception and post the message to a different Kafka retry-topic, which is processed separately by another Consumer.

Upvotes: 0

donm
donm

Reputation: 1210

If you use spring-Kafka project you can use ContainerStoppingErrorHandler Which will stop the container in error. Below is sample KafkaListener method which will retry on DataAccessException and after retires exhausted pass error to error handler defined in Config class below

@KafkaListener(topics = ("${spring.kafka.consumer.topic}"), containerFactory = "kafkaManualAckListenerContainerFactory")
@Retryable(include = DataAccessException.class, backoff = @Backoff(delay = 20000, multiplier = 3))
public void onMessage(List<ConsumerRecord<String, String>> recordList,
                      Acknowledgment acknowledgment, Consumer<?, ?> consumer) throws DataAccessException {

    try {
        kafkaSinkController.saveToDb(recordList);
        acknowledgment.acknowledge();
        LOGGER.info("Message Saved  DB");
    } catch (Exception e) {
        LOGGER.error("Other than db exception ", e)
    }
}

Config bean

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaManualAckListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, String>(consumerConfig));
    factory.setConcurrency(concurrentConsumerCount);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.getContainerProperties().setBatchErrorHandler(new ContainerStoppingBatchErrorHandler()); 
//It will stop container and thus consumer will stop listening
    factory.setBatchListener(true);
    return factory;
}

When you want to start re-consuming messages you can start container using KafkaListenerEndpointRegistry, sample method below for refrence which can be invoked programmatically once database is up for by exposing as endpoint for this method.

@Autowired
KafkaListenerEndpointRegistry registry;

public void startContainer() {
    try {
        registry.start(); 
    } catch (Exception ex) {
        //Todo
    }
}

Above sample relies on all spring components, But same might be achieved without spring-kafka project.

Upvotes: 4

Related Questions