Reputation: 31
I am relatively new to Kafka.
We process messages from kafka and persist them to database.if there is a failure when persisting them to database, the message will not be committed.
My questions are: we are wondering how we can re-consume the uncommitted messages?
I have tried a few approaches.
Whats the best practice approach in this kind of situation? Thanks in advance.
Upvotes: 3
Views: 2646
Reputation: 1
Catch an exception and post the message to a different Kafka retry-topic, which is processed separately by another Consumer.
Upvotes: 0
Reputation: 1210
If you use spring-Kafka project you can use ContainerStoppingErrorHandler Which will stop the container in error. Below is sample KafkaListener method which will retry on DataAccessException and after retires exhausted pass error to error handler defined in Config class below
@KafkaListener(topics = ("${spring.kafka.consumer.topic}"), containerFactory = "kafkaManualAckListenerContainerFactory")
@Retryable(include = DataAccessException.class, backoff = @Backoff(delay = 20000, multiplier = 3))
public void onMessage(List<ConsumerRecord<String, String>> recordList,
Acknowledgment acknowledgment, Consumer<?, ?> consumer) throws DataAccessException {
try {
kafkaSinkController.saveToDb(recordList);
acknowledgment.acknowledge();
LOGGER.info("Message Saved DB");
} catch (Exception e) {
LOGGER.error("Other than db exception ", e)
}
}
Config bean
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaManualAckListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, String>(consumerConfig));
factory.setConcurrency(concurrentConsumerCount);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setBatchErrorHandler(new ContainerStoppingBatchErrorHandler());
//It will stop container and thus consumer will stop listening
factory.setBatchListener(true);
return factory;
}
When you want to start re-consuming messages you can start container using KafkaListenerEndpointRegistry, sample method below for refrence which can be invoked programmatically once database is up for by exposing as endpoint for this method.
@Autowired
KafkaListenerEndpointRegistry registry;
public void startContainer() {
try {
registry.start();
} catch (Exception ex) {
//Todo
}
}
Above sample relies on all spring components, But same might be achieved without spring-kafka project.
Upvotes: 4