Reputation: 1737
I am using spring kafka 2.2.8 and trying to implement infinite retry policy using AlwaysRetryPolicy and I've created a custom SeekToCurrentErrorHandler to capture the exception, message key and value during deserialization.
Here is my consumer config code:
@Configuration
@EnableKafka
public class MyKafkaConsumerCommonConfig implements KafkaListenerConfigurer {
private final MySeekToCurrentErrorHandler mySeekToCurrentErrorHandler;
private final MyRecoveryCallback myRecoveryCallback;
private final MyKafkaConsumerRetryPolicy myKafkaConsumerRetryPolicy;
@Autowired
public MyKafkaConsumerCommonConfig(
MySeekToCurrentErrorHandler mySeekToCurrentErrorHandler,
MyRecoveryCallback myRecoveryCallback,
MyKafkaConsumerRetryPolicy myKafkaConsumerRetryPolicy) {
this.mySeekToCurrentErrorHandler = mySeekToCurrentErrorHandler;
this.myRecoveryCallback = myRecoveryCallback;
this.myKafkaConsumerRetryPolicy = myKafkaConsumerRetryPolicy;
}
@Bean
public <K,V> ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(primaryConsumerFactory());
// To achieve Stateful Retry
factory.setStatefulRetry(true);
factory.setErrorHandler(mySeekToCurrentErrorHandler);
// Retry Template
factory.setRetryTemplate(retryTemplate());
//Recovery Call back
factory.setRecoveryCallback(myRecoveryCallback);
return factory;
}
@Bean
public DefaultKafkaConsumerFactory<Object, Object> primaryConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(getConfigs());
}
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setListeners(new RetryListener[]{myKafkaRetryListener});
retryTemplate.setRetryPolicy(myConsumerRetryPolicy);
ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
exponentialBackOffPolicy.setInitialInterval(Long.parseLong(environment.getProperty("MY_KAFKA_RETRY_BACK_OFF_IN_MILLIS", "500")));
//As per the spring-kafka documentation, maxInterval (60000 ms) should be set less than max.poll.interval.ms (600000 ms)
exponentialBackOffPolicy.setMaxInterval(Long.parseLong(environment.getProperty("MY_KAFKA_MAX_RETRY_BACK_OFF_IN_MILLIS", "60000")));
retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);
return retryTemplate;
}
public Map<String, Object> getConfigs() {
Map<String, Object> configs = = new HashMap<>();
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
configs.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, stringDeserializerClass);
configs.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, kafkaAvroDeserializerClass.getName());
sapphireKafkaConsumerConfig.setPartitionAssignmentStrategyConfig(
Collections.singletonList(RoundRobinAssignor.class));
// Set this to true so that you will have consumer record value coming as your pre-defined contract instead of a generic record
sapphireKafkaConsumerConfig.setSpecificAvroReader("true");
}
}
Here is my code for custom RetryPolicy and custom SeekToCurrentErrorHandler.
@Component
public class MyKafkaConsumerRetryPolicy extends ExceptionClassifierRetryPolicy {
private final Environment environment;
public MyKafkaConsumerRetryPolicy(Environment environment) {
this.environment = environment;
}
@PostConstruct
public void init() {
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(Integer.parseInt(environment.getProperty("MY_KAFKA_RETRY_COUNT", "3")));
this.setExceptionClassifier(classifiable ->
{
// Always Retry when instanceOf MyCustomException
if (classifiable.getCause() instanceof MyCustomException) {
return new AlwaysRetryPolicy();
} else if (classifiable.getCause() instanceof DeserializationException) {
return new NeverRetryPolicy();
} else {
return simpleRetryPolicy;
}
});
}
}
@Component
public class MySeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {
private final MyLogger logger;
@Autowired
public MySeekToCurrentErrorHandler(MyLogger logger) {
super(-1);
this.logger = logger;
}
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> data,
Consumer<?, ?> consumer, MessageListenerContainer container) {
if(thrownException instanceof DeserializationException){
// This portion would be executed when there is DeserializationException
final String messageKey = (String) data.get(0).key();
logger.logException(thrownException, "DeserializerException occurred for message key"+messageKey, getClass());
System.out.println("messageKey is "+messageKey);
System.out.println("message Value is "+data.get(0).value());
//TODO: Send message to DLT
} else {
//Calling super method to let the 'SeekToCurrentErrorHandler' do what it is actually designed for
super.handle(thrownException, data, consumer, container);
}
}
As you can see above, I'm setting the max attempts to '-1' in my custom SeekToCurrentErrorHandler constructor. Now, to test this, I'm throwing MyCustomException in my consumer which should pick the AlwaysRetryPolicy and trigger infinite retry. But it's retrying only 10 times and stopping. Please suggest, what am i missing here?
Upvotes: 0
Views: 1216
Reputation: 174759
} else if (classifiable.getCause() instanceof DeserializationException) {
That will never happen that far up the stack; deserialization exceptions are passed directly to the container's error handler. You already have code in your error handler for that case.
super(-1);
That should work; use a debugger; set a breakpoint in FailedRecordTracker.skip()
to see if this.maxFailures
is 10; if it is -1, it should always return false
.
EDIT
Here's a simpler version of what you have (Spring Boot app) and it works as expected:
@SpringBootApplication
public class So60641945Application {
private static final Logger LOG = LoggerFactory.getLogger(So60641945Application.class);
public static void main(String[] args) {
SpringApplication.run(So60641945Application.class, args);
}
@Bean
public ErrorHandler eh() {
class MyEH extends SeekToCurrentErrorHandler {
MyEH() {
super(-1);
}
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
MessageListenerContainer container) {
LOG.info("handle");
super.handle(thrownException, records, consumer, container);
}
};
return new MyEH();
}
private final AtomicInteger count = new AtomicInteger();
@KafkaListener(id = "so60641945", topics = "so60641945")
public void listen(String in) {
LOG.info(in + this.count.incrementAndGet());
throw new RuntimeException();
}
@Bean
public NewTopic topic() {
return new NewTopic("so60641945", 1, (short) 1);
}
}
@Component
class FactoryConfigurer {
FactoryConfigurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new AlwaysRetryPolicy());
FixedBackOffPolicy backOff = new FixedBackOffPolicy();
backOff.setBackOffPeriod(1000);
retryTemplate.setBackOffPolicy(backOff);
factory.setRetryTemplate(retryTemplate);
factory.setStatefulRetry(true);
}
}
and
2020-03-11 15:53:23.497 INFO 30480 --- [o60641945-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=so60641945] Setting newly assigned partitions [so60641945-0]
2020-03-11 15:53:23.519 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : foo1
2020-03-11 15:53:24.521 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : handle
2020-03-11 15:53:24.526 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : foo2
2020-03-11 15:53:25.527 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : handle
2020-03-11 15:53:25.529 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : foo3
2020-03-11 15:53:26.534 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : handle
2020-03-11 15:53:26.537 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : foo4
2020-03-11 15:53:27.539 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : handle
2020-03-11 15:53:27.541 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : foo5
2020-03-11 15:53:28.542 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : handle
2020-03-11 15:53:28.546 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : foo6
2020-03-11 15:53:29.549 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : handle
2020-03-11 15:53:29.552 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : foo7
2020-03-11 15:53:30.553 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : handle
2020-03-11 15:53:30.555 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : foo8
2020-03-11 15:53:31.559 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : handle
2020-03-11 15:53:31.561 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : foo9
2020-03-11 15:53:32.562 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : handle
2020-03-11 15:53:32.564 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : foo10
2020-03-11 15:53:33.568 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : handle
2020-03-11 15:53:33.571 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : foo11
2020-03-11 15:53:34.572 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : handle
2020-03-11 15:53:34.574 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : foo12
2020-03-11 15:53:35.577 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : handle
2020-03-11 15:53:35.580 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : foo13
2020-03-11 15:53:36.582 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : handle
2020-03-11 15:53:36.584 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : foo14
2020-03-11 15:53:37.587 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : handle
2020-03-11 15:53:37.589 INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application : foo15
...
EDIT2
Embedded kafka:
@RunWith(SpringRunner.class)
@SpringBootTest
public class So60641945ApplicationTests {
@ClassRule
public static EmbeddedKafkaRule embedded = new EmbeddedKafkaRule(1);
@Autowired
private KafkaTemplate<String, String> template;
@Test
public void contextLoads() throws InterruptedException {
this.template.send("so60641945", "bar");
Thread.sleep(30_000);
}
}
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
Upvotes: 2