Reputation: 1
I created a config class for consumer with RetryTemplate and RecoveryCallback.
I want to just when works RecoveryCallback get consumer message as string. Anyway to do this?
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer factoryConfigurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factoryConfigurer.configure(factory, kafkaConsumerFactory);
factory.setRetryTemplate(kafkaRetry());
// factory.setErrorHandler(kafkaErrorHandler);
factory.setRecoveryCallback((context -> {
ConsumerRecord record = (ConsumerRecord) context.getAttribute("record");
log.info("Record value {}", record.value());
System.out.println("========");
System.out.println("========");
return Optional.empty();
}));
return factory;
}
public RetryTemplate kafkaRetry() {
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(backOffPeriod);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(maxAttempts);
retryTemplate.setRetryPolicy(simpleRetryPolicy);
return retryTemplate;
}
Let me explain what I want to do. I want retry 3-4 times consume if not success when call fallback just save consumer message to database like json string. Just I cannot get message from consumer like json string in Fallback
Upvotes: 0
Views: 1179
Reputation: 174729
Note that retry in the container factory/listener adapter is deprecated now that the error handlers have back offs and exception classification.
https://github.com/spring-projects/spring-kafka/issues/1886
The record is available in the record handler's recoverer.
Upvotes: 1