Reputation: 1339
We have a Kafka Consumer setup like below
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
final Map<String, Object> props = kafkaProperties.buildConsumerProperties();
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> batchFactory(
final ConsumerFactory<String, Object> consumerFactory,
@Value("${someProp.batch}") final boolean enableBatchListener,
@Value("${someProp.concurrency}") final int consumerConcurrency,
@Value("${someProp.error.backoff.ms}") final int errorBackoffInterval
) {
final SeekToCurrentBatchErrorHandler errorHandler = new SeekToCurrentBatchErrorHandler();
errorHandler.setBackOff(new FixedBackOff(errorBackoffInterval, UNLIMITED_ATTEMPTS));
final var containerFactory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
containerFactory.setConsumerFactory(consumerFactory);
containerFactory.getContainerProperties().setAckMode(MANUAL_IMMEDIATE);
containerFactory.getContainerProperties().setMissingTopicsFatal(false);
containerFactory.setBatchListener(enableBatchListener);
containerFactory.setConcurrency(consumerConcurrency);
containerFactory.setBatchErrorHandler(errorHandler);
return containerFactory;
}
someProp:
concurrency: 16
batch: true
error.backoff.ms: 2000
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
consumer:
groupId: some-grp
autoOffsetReset: earliest
keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
valueDeserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: ${SCHEMA_REGISTRY_URL}
specific.avro.reader: true
security.protocol: SSL
In batch listener method annotated with @KafkaListener
, we call acknowledgment.acknowledge()
at the end of processing of the list. Assuming that when the service comes up, I already have a million messages in the topic ready to be consumed by the service, I have following questions with respect to this scenario as I could not find documentation which talks in detail regarding the batch listening:
max.poll.records
is not set and hence defaults to 500, so the list will have 500 messages. Is this understanding correct?SeekToCurrentBatchErrorHandler
, the batch will be replayed in case there is any exception in processing inside the listener method. So, if in a particular batch there is an exception processing the 50th message, first 49 will be played again (basically duplicates, which I am fine with), next 50 to 500 messages will be played and tried for processing as usual. Is this understanding correct?SeekToCurrentBatchErrorHandler
, how is the offset commit handled, as other consumer threads would still be processing the messages successfully thus moving the offset pointer way forward then the stuck consumers offsetsMANUAL_IMMEDIATE
states/**
* User takes responsibility for acks using an
* {@link AcknowledgingMessageListener}. The consumer
* immediately processes the commit.
*/
MANUAL_IMMEDIATE,
Does this mean calling acknowledgment.acknowledge()
is not sufficient and AcknowledgingMessageListener
has to be used in some way? If yes, what is the preferred approach.
Upvotes: 1
Views: 1061
Reputation: 174729
You will get "up to" 500; there is no guarantee you will get exactly 500.
Yes; 16 consumers (assuming you have at least 16 partitions).
Correct.
Correct; but version 2.5 now has the RecoveringBatchErrorHandler
whereby you can throw a special exception to tell it where in the batch the error occurred; it will commit the offsets of the successful records and seek the remaining ones.
The consumers get unique partitions so a consumer that is "stuck" has no impact on other consumers.
I am not sure what you are asking there; if you are calling ack.acknowledge()
you are already using an AcknowledgingMessageListener
(@KafkaListener
always has that capability; we only populate the ack with a manual ack mode.
However, you really don't need to use manual acks for this use case; the container will commit the offsets automatically when the listener exits normally; no need to unnecessarily complicate your code.
Upvotes: 1