Raj
Raj

Reputation: 1737

Issue in implementing infinite retry in kafka consumer

I am using spring kafka 2.2.8 and trying to implement infinite retry policy using AlwaysRetryPolicy and I've created a custom SeekToCurrentErrorHandler to capture the exception, message key and value during deserialization.

Here is my consumer config code:

@Configuration
@EnableKafka
public class MyKafkaConsumerCommonConfig implements KafkaListenerConfigurer {
        private final MySeekToCurrentErrorHandler mySeekToCurrentErrorHandler;
        private final MyRecoveryCallback myRecoveryCallback;
        private final MyKafkaConsumerRetryPolicy myKafkaConsumerRetryPolicy;

        @Autowired
        public MyKafkaConsumerCommonConfig(
            MySeekToCurrentErrorHandler mySeekToCurrentErrorHandler,
            MyRecoveryCallback myRecoveryCallback,
            MyKafkaConsumerRetryPolicy myKafkaConsumerRetryPolicy) {

            this.mySeekToCurrentErrorHandler = mySeekToCurrentErrorHandler;
            this.myRecoveryCallback = myRecoveryCallback;
            this.myKafkaConsumerRetryPolicy = myKafkaConsumerRetryPolicy;
        }

        @Bean
        public <K,V> ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
          ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
          factory.setConsumerFactory(primaryConsumerFactory());

          // To achieve Stateful Retry
          factory.setStatefulRetry(true);
          factory.setErrorHandler(mySeekToCurrentErrorHandler);

          // Retry Template
          factory.setRetryTemplate(retryTemplate());

          //Recovery Call back
          factory.setRecoveryCallback(myRecoveryCallback);

          return factory;
        }

        @Bean
        public DefaultKafkaConsumerFactory<Object, Object> primaryConsumerFactory() {
          return new DefaultKafkaConsumerFactory<>(getConfigs());
        }

        public RetryTemplate retryTemplate() {
            RetryTemplate retryTemplate = new RetryTemplate();
            retryTemplate.setListeners(new RetryListener[]{myKafkaRetryListener});
            retryTemplate.setRetryPolicy(myConsumerRetryPolicy);

            ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
            exponentialBackOffPolicy.setInitialInterval(Long.parseLong(environment.getProperty("MY_KAFKA_RETRY_BACK_OFF_IN_MILLIS", "500")));

            //As per the spring-kafka documentation, maxInterval (60000 ms) should be set less than max.poll.interval.ms (600000 ms)
            exponentialBackOffPolicy.setMaxInterval(Long.parseLong(environment.getProperty("MY_KAFKA_MAX_RETRY_BACK_OFF_IN_MILLIS", "60000")));
            retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);

            return retryTemplate;
       }

       public Map<String, Object> getConfigs() {

           Map<String, Object> configs = = new HashMap<>();

           configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

           configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
           configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);

           configs.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, stringDeserializerClass);
           configs.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, kafkaAvroDeserializerClass.getName());

           sapphireKafkaConsumerConfig.setPartitionAssignmentStrategyConfig(
               Collections.singletonList(RoundRobinAssignor.class));

           // Set this to true so that you will have consumer record value coming as your pre-defined contract instead of a generic record
           sapphireKafkaConsumerConfig.setSpecificAvroReader("true");
         }
}

Here is my code for custom RetryPolicy and custom SeekToCurrentErrorHandler.

@Component
public class MyKafkaConsumerRetryPolicy extends ExceptionClassifierRetryPolicy {

  private final Environment environment;

  public MyKafkaConsumerRetryPolicy(Environment environment) {
    this.environment = environment;
  }

  @PostConstruct
  public void init() {
    SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
    simpleRetryPolicy.setMaxAttempts(Integer.parseInt(environment.getProperty("MY_KAFKA_RETRY_COUNT", "3")));

    this.setExceptionClassifier(classifiable ->
    {
      // Always Retry when instanceOf   MyCustomException
      if (classifiable.getCause() instanceof MyCustomException) {
        return new AlwaysRetryPolicy();
      } else if (classifiable.getCause() instanceof DeserializationException) {
        return new NeverRetryPolicy();
      } else {
        return simpleRetryPolicy;
      }
    });
  }
}



@Component
public class MySeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {

  private final MyLogger logger;

  @Autowired
  public MySeekToCurrentErrorHandler(MyLogger logger) {

   super(-1);

    this.logger = logger;
  }

  @Override
    public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> data,
        Consumer<?, ?> consumer, MessageListenerContainer container) {

  if(thrownException instanceof DeserializationException){

    // This portion would be executed when there is DeserializationException
    final String messageKey = (String) data.get(0).key();

   logger.logException(thrownException, "DeserializerException occurred for  message key"+messageKey, getClass());

    System.out.println("messageKey is "+messageKey);
    System.out.println("message Value is "+data.get(0).value());

    //TODO: Send message to DLT
  } else {

   //Calling super method to let the 'SeekToCurrentErrorHandler' do what it is actually designed for
    super.handle(thrownException, data, consumer, container);
  }
}

As you can see above, I'm setting the max attempts to '-1' in my custom SeekToCurrentErrorHandler constructor. Now, to test this, I'm throwing MyCustomException in my consumer which should pick the AlwaysRetryPolicy and trigger infinite retry. But it's retrying only 10 times and stopping. Please suggest, what am i missing here?

Upvotes: 0

Views: 1216

Answers (1)

Gary Russell
Gary Russell

Reputation: 174759

  1. } else if (classifiable.getCause() instanceof DeserializationException) {

That will never happen that far up the stack; deserialization exceptions are passed directly to the container's error handler. You already have code in your error handler for that case.

  1. super(-1);

That should work; use a debugger; set a breakpoint in FailedRecordTracker.skip() to see if this.maxFailures is 10; if it is -1, it should always return false.

EDIT

Here's a simpler version of what you have (Spring Boot app) and it works as expected:

@SpringBootApplication
public class So60641945Application {


    private static final Logger LOG = LoggerFactory.getLogger(So60641945Application.class);


    public static void main(String[] args) {
        SpringApplication.run(So60641945Application.class, args);
    }

    @Bean
    public ErrorHandler eh() {
        class MyEH extends SeekToCurrentErrorHandler {

            MyEH() {
                super(-1);
            }

            @Override
            public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
                    MessageListenerContainer container) {

                LOG.info("handle");
                super.handle(thrownException, records, consumer, container);
            }

        };
        return new MyEH();
    }

    private final AtomicInteger count = new AtomicInteger();

    @KafkaListener(id = "so60641945", topics = "so60641945")
    public void listen(String in) {
        LOG.info(in + this.count.incrementAndGet());
        throw new RuntimeException();
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so60641945", 1, (short) 1);
    }

}

@Component
class FactoryConfigurer {

    FactoryConfigurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(new AlwaysRetryPolicy());
        FixedBackOffPolicy backOff = new FixedBackOffPolicy();
        backOff.setBackOffPeriod(1000);
        retryTemplate.setBackOffPolicy(backOff);
        factory.setRetryTemplate(retryTemplate);

        factory.setStatefulRetry(true);
    }

}

and

2020-03-11 15:53:23.497  INFO 30480 --- [o60641945-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=so60641945] Setting newly assigned partitions [so60641945-0]
2020-03-11 15:53:23.519  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : foo1
2020-03-11 15:53:24.521  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : handle
2020-03-11 15:53:24.526  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : foo2
2020-03-11 15:53:25.527  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : handle
2020-03-11 15:53:25.529  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : foo3
2020-03-11 15:53:26.534  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : handle
2020-03-11 15:53:26.537  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : foo4
2020-03-11 15:53:27.539  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : handle
2020-03-11 15:53:27.541  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : foo5
2020-03-11 15:53:28.542  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : handle
2020-03-11 15:53:28.546  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : foo6
2020-03-11 15:53:29.549  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : handle
2020-03-11 15:53:29.552  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : foo7
2020-03-11 15:53:30.553  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : handle
2020-03-11 15:53:30.555  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : foo8
2020-03-11 15:53:31.559  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : handle
2020-03-11 15:53:31.561  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : foo9
2020-03-11 15:53:32.562  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : handle
2020-03-11 15:53:32.564  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : foo10
2020-03-11 15:53:33.568  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : handle
2020-03-11 15:53:33.571  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : foo11
2020-03-11 15:53:34.572  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : handle
2020-03-11 15:53:34.574  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : foo12
2020-03-11 15:53:35.577  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : handle
2020-03-11 15:53:35.580  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : foo13
2020-03-11 15:53:36.582  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : handle
2020-03-11 15:53:36.584  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : foo14
2020-03-11 15:53:37.587  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : handle
2020-03-11 15:53:37.589  INFO 30480 --- [o60641945-0-C-1] com.example.demo.So60641945Application   : foo15
...

EDIT2

Embedded kafka:

@RunWith(SpringRunner.class)
@SpringBootTest
public class So60641945ApplicationTests {

    @ClassRule
    public static EmbeddedKafkaRule embedded = new EmbeddedKafkaRule(1);

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    public void contextLoads() throws InterruptedException {
        this.template.send("so60641945", "bar");
        Thread.sleep(30_000);
    }

}
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}

Upvotes: 2

Related Questions