Reputation: 25
I tried this code, here:
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configure,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configure.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template),3));
return factory;
}
The int on this part
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template),3));
which is the number 3 isn't allowed as it requires BackOff rather than int type.
Upvotes: 0
Views: 79
Reputation: 174769
That constructor was deprecated in 2.3.x and removed in 2.5.x.
See the documentation: https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek-to-current
Previously, the configuration was "maxFailures" (which included the first delivery attempt). When using a FixedBackOff, its maxAttempts property represents the number of delivery retries (one less than the old maxFailures property). Also,
maxFailures=-1
meant retry indefinitely with the old configuration, with aBackOff
you would set the maxAttempts to Long.MAX_VALUE for aFixedBackOff
and leave the maxElapsedTime to its default in anExponentialBackOff
.
The equivalent now is
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), new FixedBackOff(0L, 2));
(no delay 3 delivery attempts).
Upvotes: 1