Reputation: 444
Following are the configurations
@Bean
@Autowired
public ConcurrentKafkaListenerContainerFactory<byte[], byte[]> kafkaListContFactory(@Qualifier("retryTemplate") RetryTemplate retryTemplate, @Qualifier("batchErrorHandler") ErrorHandler errorHandler, @Qualifier("batchErrorHandler") BatchErrorHandler batchErrorHandler) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setErrorHandler(errorHandler);
factory.getContainerProperties().setAckOnError(false);
factory.setStatefulRetry(true);
factory.setRetryTemplate(retryTemplate);
}
**Retry config**
@Bean("retryTemplate")
public RetryTemplate retryTemplate() {
final ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(delay);
backOffPolicy.setMultiplier(multiplier);
backOffPolicy.setMaxInterval(20000);
final RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(new AlwaysRetryPolicy());
template.setBackOffPolicy(backOffPolicy);
return template;
}
SeekToCurrentErrorHandler config
I do not want to recover and try to retry till it succeeds so I have given maxAttempts to -1
@Bean("errorHandler")
public ErrorHandler errorHandler() {
final ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler((r, t) -> {
if (t != null && (t instanceof RetryServiceException || t.getCause() instanceof RetryServiceException)) {
logger.error("SeekToCurrentErrorHandler recoverer failure", t.getMessage());
throw new RetryServiceException("SeekToCurrentErrorHandler recoverer failure");
}
}, -1);
return handler;
}
And finally I am acknowledging in @KafkaListener method when no exception happens.
My question is if I have configured -1 as max attempts and my error handler will take care of retrying do I need retryTemplate? But retry is not happening for infinite times and the problem is If i fetch a batch record I am processing the same messages if one of the message fails in the poll all will be reprocessed.
I need to use batchErrorHandler and implement a exponential backoff strategy so that retry will be stateful and reprocessing of same success messages should be avoided. Could anyone help with the above issue.
And I need to avoid rebalancing of partition of by wrong usage of max.poll.interval.ms
Upvotes: 0
Views: 1878
Reputation: 174554
You need to use Stateful Retry to avoid a rebalance; however, with modern versions, you don't need listener-level retry at all because you can now perform the retries and back off at the error handler level.
Use the constructor take takes a BackOff
and remove the container retry template.
/**
* Construct an instance with the provided recoverer which will be called after
* the backOff returns STOP for a topic/partition/offset.
* @param recoverer the recoverer; if null, the default (logging) recoverer is used.
* @param backOff the {@link BackOff}.
* @since 2.3
*/
public SeekToCurrentErrorHandler(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff) {
I need to use batchErrorHandler and implement a exponential backoff strategy so that retry will be stateful and reprocessing of same success messages should be avoided. Could anyone help with the above issue.
The framework can't help with batch listeners since it doesn't know where (in the batch) the failure occurred.
Upvotes: 1