Samir Aghjayev
Samir Aghjayev

Reputation: 1

Kafka consumer get message as string from recovery context

I created a config class for consumer with RetryTemplate and RecoveryCallback.

I want to just when works RecoveryCallback get consumer message as string. Anyway to do this?

  @Bean
  public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
      ConcurrentKafkaListenerContainerFactoryConfigurer factoryConfigurer,
      ConsumerFactory<Object, Object> kafkaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factoryConfigurer.configure(factory, kafkaConsumerFactory);
    factory.setRetryTemplate(kafkaRetry());
//    factory.setErrorHandler(kafkaErrorHandler);
    factory.setRecoveryCallback((context -> {
      ConsumerRecord record = (ConsumerRecord) context.getAttribute("record");
      log.info("Record value {}", record.value());
      System.out.println("========");
      System.out.println("========");
      return Optional.empty();
    }));
    return factory;
  }

  public RetryTemplate kafkaRetry() {
    RetryTemplate retryTemplate = new RetryTemplate();
    FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
    fixedBackOffPolicy.setBackOffPeriod(backOffPeriod);
    retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
    SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(maxAttempts);
    retryTemplate.setRetryPolicy(simpleRetryPolicy);
    return retryTemplate;
  }

Let me explain what I want to do. I want retry 3-4 times consume if not success when call fallback just save consumer message to database like json string. Just I cannot get message from consumer like json string in Fallback

Upvotes: 0

Views: 1179

Answers (1)

Gary Russell
Gary Russell

Reputation: 174729

Note that retry in the container factory/listener adapter is deprecated now that the error handlers have back offs and exception classification.

https://github.com/spring-projects/spring-kafka/issues/1886

The record is available in the record handler's recoverer.

Upvotes: 1

Related Questions