Madmartigan
Madmartigan

Reputation: 13

Kafka Spring consumer offsets are not committed with ConsumerRecordRecoverer

Tech details:

Versions:

spring-boot : 2.2.2.RELEASE
spring-kafka : 2.3.7.RELEASE
kafka broker : 2.3.1 (via amazon MSK)

Props:

auto.offset.reset: earliest
enable.auto.commit: false
isolation.level: read_committed

Issue and Behavior:

I have a KafkaListener using a ConcurrentKafkaListenerContainerFactory configured with a custom implementation of ConsumerRecordRecoverer. I've noticed that when this container does recover from some exception, the consumer offsets for said recovered message are not committed. Offsets are only being committed when a message is successfully processed (i.e. no recovery). However, the listener/consumer/container does appear to retain the real offset in memory, since will advance past the recovered message while this application remains running.

This will cause an issue if the spring boot application is restarted when the last message was not one that was successfully processed and will resume from the last offset that was actually committed, likely reprocessing messages that were recovered but whose offsets were not committed.

I confirmed this with a local test on an empty topic.

  1. Before: Consumer group offset was 0 for partition 0 in kafka
  2. Pushed in a message that results in a listener exception and recovery.
  3. After: Consumer group offset remained 0 for partition 0, now with a lag.

At this point, I am assuming I am missing some critical configuration or setter on the spring artifacts, but it is not clear to me what is missing. I had assumed this would be the purpose of using the DefaultAfterRollbackProcessor#setCommitRecovered to true.

Code Samples

KafkaConfiguration

@Configuration
public class KafkaConfig {


  @Bean
  ConsumerRetryConfig retryConfig() {
    return new ConsumerRetryConfig();
  }

  @Bean
  public RetryTemplate consumerRetryTemplate(ConsumerRetryConfig consumerRetryConfig) {
    RetryTemplate retryTemplate = new RetryTemplate();

    FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
    fixedBackOffPolicy.setBackOffPeriod(consumerRetryConfig.getRetryWaitInterval());
    retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(consumerRetryConfig.getMaxRetries());
    retryTemplate.setRetryPolicy(retryPolicy);

    return retryTemplate;
  }

  @Bean
  @Lazy
  FiniteRequeueingRecovererConfig finiteRequeueingRecovererConfig() {
    return new FiniteRequeueingRecovererConfig();
  }

  @Bean
  @Lazy
  FiniteRequeueingRecordRecoverer finiteRequeueingRecordRecoverer(
    KafkaTemplate<String, SpecificRecord> kafkaTemplate,
    FiniteRequeueingRecovererConfig finiteRequeueingRecovererConfig
  ) {
    return new FiniteRequeueingRecordRecoverer(kafkaTemplate, finiteRequeueingRecovererConfig.getMaxRequeues());
  }

  @Bean
  @Lazy
  DefaultAfterRollbackProcessor finiteRequeueingRollbackProcessor(
    FiniteRequeueingRecordRecoverer finiteRequeueingRecordRecoverer,
    ConsumerRetryConfig consumerRetryConfig
  ) {
    DefaultAfterRollbackProcessor ret = new DefaultAfterRollbackProcessor(
      finiteRequeueingRecordRecoverer,
      new FixedBackOff(
        consumerRetryConfig.getRetryWaitInterval(),
        consumerRetryConfig.getMaxRetries()
      )
    );
    ret.setCommitRecovered(true);
    return ret;
  }

  @Bean
  public ProducerFactory<String, SpecificRecord> avroMessageProducerFactory(KafkaProperties kafkaProperties) {
    Map<String, Object> props = MapBuilder.<String, Object>builder()
      .putAll(kafkaProperties.buildProducerProperties())
      .put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString())
      .build();

    return (kafkaAvroSerializer==null) ?
      new DefaultKafkaProducerFactory<>(props) :
      new DefaultKafkaProducerFactory(props, new StringSerializer(), kafkaAvroSerializer);
  }

  @Bean
  public KafkaTemplate<String, SpecificRecord> avroMessageKafkaTemplate(ProducerFactory<String, SpecificRecord> avroMessageProducerFactory) {
    return new KafkaTemplate<>(avroMessageProducerFactory);
  }

  @Bean
  public KafkaTransactionManager<?,?> kafkaTransactionManager(ProducerFactory<String, SpecificRecord> avroMessageProducerFactory) {
    return new KafkaTransactionManager<>(avroMessageProducerFactory);
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<?, ?> finiteRequeueingKafkaListenerContainerFactory(
    ConsumerFactory<Object, Object> consumerFactory,
    ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
    KafkaTransactionManager<Object, Object> kafkaTransactionManager,
    DefaultAfterRollbackProcessor finiteRequeueingRollbackProcessor
  ) {

    ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, consumerFactory);
    factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);

    factory.setStatefulRetry(true);
    factory.setAfterRollbackProcessor(finiteRequeueingRollbackProcessor);

    return factory;
  }

  @KafkaListener(
    id = "${some.listener-id}",
    topics = "${some.topic}",
    groupId = "${some.group-id}",
    containerFactory = "finiteRequeueingKafkaListenerContainerFactory"
  )
  public void consume(
    @Payload WebhookNotificationMessage message,
    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
    @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
    @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
  ) throws Exception {

    // Do the thing, maybe throw an exception

  }

}

FiniteRequeueingRecordRecoverer

public class FiniteRequeueingRecordRecoverer implements ConsumerRecordRecoverer {
  private final Logger logger = LoggerLike.getLogger(FiniteRequeueingRecordRecoverer.class);

  private KafkaTemplate<String, SpecificRecord> kafkaTemplate;
  private Integer maxRequeues;

  public FiniteRequeueingRecordRecoverer(KafkaTemplate<String, SpecificRecord> kafkaTemplate, Integer maxRequeues) {
    this.kafkaTemplate = kafkaTemplate;
    this.maxRequeues = maxRequeues;
  }

  @Override
  public void accept(ConsumerRecord<?, ?> consumerRecord, Exception e) {

    // Not sure the substance of this recoverer is relevant...but if so
    // If the retry number in the avro record is < this.maxRequeues
    //   then increment the retries and re enqueue this message, move on
    // If retries have been exhausted, do not requeue and send to a dead letter or just abandon
  }
}

Upvotes: 1

Views: 4865

Answers (1)

Gary Russell
Gary Russell

Reputation: 174799

The DefaultAfterRollbackProcessor needs a KafkaTemplate to send the offset to a new transaction.

We should probably log a warning if commitRecovered is true and there is no KT.

Upvotes: 2

Related Questions