Reputation: 107
I have a Kafka consumer which retry 5 time and I am using Spring Kafka with retry template . Now if all retry are failed then how to does acknowledge work in that case . Also if i have set acknowledge mode to manually then how to acknowledge those message
Consumer
@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(RetryTemplate retryTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setRetryTemplate(retryTemplate);
factory.setRecoveryCallback(context -> {
log.error("Maximum retry policy has been reached {}", context.getAttribute("record"));
Acknowledgment ack = (Acknowledgment) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT);
ack.acknowledge();
return null;
});
factory.setConcurrency(Integer.parseInt(kafkaConcurrency));
return factory;
}
Kafka Listener
@KafkaListener(topics = "${kafka.topic.json}", containerFactory = "kafkaListenerContainerFactory")
public void recieveSegmentService(String KafkaPayload, Acknowledgment acknowledgment) throws Exception {
KafkaSegmentTrigger kafkaSegmentTrigger;
kafkaSegmentTrigger = TransformUtil.fromJson(KafkaPayload, KafkaSegmentTrigger.class);
log.info("Trigger recieved from segment service {}", kafkaSegmentTrigger);
processMessage(kafkaSegmentTrigger);
acknowledgment.acknowledge();
}
Upvotes: 2
Views: 2950
Reputation: 174494
If you have a RecoveryCallback
that handles the record when retries are exhausted, and you are NOT using manual acks, the container will commit the offset.
If you are using MANUAL acks, you can commit the offset in the recovery callback.
The context object passed to the callback has an attribute RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT
('acknowledgment').
It also has CONTEXT_CONSUMER
and CONTEXT_RECORD
.
Upvotes: 4