Mukund Jalan
Mukund Jalan

Reputation: 1339

Handling commits for errors with @KafkaListener in batch consumers

We have a Kafka Consumer setup like below

  @Bean
  public ConsumerFactory<String, Object> consumerFactory() {
    final Map<String, Object> props = kafkaProperties.buildConsumerProperties();
    return new DefaultKafkaConsumerFactory<>(props);
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, Object> batchFactory(
      final ConsumerFactory<String, Object> consumerFactory,
      @Value("${someProp.batch}") final boolean enableBatchListener,
      @Value("${someProp.concurrency}") final int consumerConcurrency,
      @Value("${someProp.error.backoff.ms}") final int errorBackoffInterval
  ) {
    final SeekToCurrentBatchErrorHandler errorHandler = new SeekToCurrentBatchErrorHandler();
    errorHandler.setBackOff(new FixedBackOff(errorBackoffInterval, UNLIMITED_ATTEMPTS));

    final var containerFactory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
    containerFactory.setConsumerFactory(consumerFactory);
    containerFactory.getContainerProperties().setAckMode(MANUAL_IMMEDIATE);
    containerFactory.getContainerProperties().setMissingTopicsFatal(false);
    containerFactory.setBatchListener(enableBatchListener);
    containerFactory.setConcurrency(consumerConcurrency);
    containerFactory.setBatchErrorHandler(errorHandler);

    return containerFactory;
  }
someProp:
  concurrency: 16
  batch: true
  error.backoff.ms: 2000
spring:
  kafka:
    bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
    consumer:
      groupId: some-grp
      autoOffsetReset: earliest
      keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
      valueDeserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      properties:
        schema.registry.url: ${SCHEMA_REGISTRY_URL}
        specific.avro.reader: true
        security.protocol: SSL

In batch listener method annotated with @KafkaListener, we call acknowledgment.acknowledge() at the end of processing of the list. Assuming that when the service comes up, I already have a million messages in the topic ready to be consumed by the service, I have following questions with respect to this scenario as I could not find documentation which talks in detail regarding the batch listening:

  1. The listener will read 500 messages in the list. 500 because max.poll.records is not set and hence defaults to 500, so the list will have 500 messages. Is this understanding correct?
  2. Given the above, where does the consumer concurrency come into picture? Does the stated configuration mean I will have 16 consumers each of which can read 500 messages in parallel from the same topic?
  3. I understand, in this case I must have at least 16 partitions to make use of all the consumers otherwise I would be left with consumers who do nothing?
  4. Due to SeekToCurrentBatchErrorHandler, the batch will be replayed in case there is any exception in processing inside the listener method. So, if in a particular batch there is an exception processing the 50th message, first 49 will be played again (basically duplicates, which I am fine with), next 50 to 500 messages will be played and tried for processing as usual. Is this understanding correct?
  5. If there are multiple batches being read continuously and a particular consumer thread gets stuck with the SeekToCurrentBatchErrorHandler, how is the offset commit handled, as other consumer threads would still be processing the messages successfully thus moving the offset pointer way forward then the stuck consumers offsets
  6. The doc for MANUAL_IMMEDIATE states
/**
 * User takes responsibility for acks using an
 * {@link AcknowledgingMessageListener}. The consumer
 * immediately processes the commit.
 */
MANUAL_IMMEDIATE,

Does this mean calling acknowledgment.acknowledge() is not sufficient and AcknowledgingMessageListener has to be used in some way? If yes, what is the preferred approach.

Upvotes: 1

Views: 1061

Answers (1)

Gary Russell
Gary Russell

Reputation: 174729

  1. You will get "up to" 500; there is no guarantee you will get exactly 500.

  2. Yes; 16 consumers (assuming you have at least 16 partitions).

  3. Correct.

  4. Correct; but version 2.5 now has the RecoveringBatchErrorHandler whereby you can throw a special exception to tell it where in the batch the error occurred; it will commit the offsets of the successful records and seek the remaining ones.

  5. The consumers get unique partitions so a consumer that is "stuck" has no impact on other consumers.

  6. I am not sure what you are asking there; if you are calling ack.acknowledge() you are already using an AcknowledgingMessageListener (@KafkaListener always has that capability; we only populate the ack with a manual ack mode.

However, you really don't need to use manual acks for this use case; the container will commit the offsets automatically when the listener exits normally; no need to unnecessarily complicate your code.

Upvotes: 1

Related Questions