user3842182
user3842182

Reputation: 391

Ways to manually commit offset in kafka consumers utilizing spring kafka

I am trying to figure out ways to manually commit offsets in Kafka consumer, using Spring-Kafka (1.1.0.RELEASE). I understand, that it will be better to commit these offsets for robust client side implementation, such that other consumers do not process duplicate events, that might have originally been processed by a now dead consumer, or because rebalancing was triggered.

I see there are two ways to handle this -

However, with this approach, I don’t know how can I get a reference to the consumer, to invoke consumer.commitSync() or consumer.commitASync() APIs. For some technical restrictions, I cannot move to the latest version of spring-kafka that supports ConsumerAwareRebalanceListener and has a reference to the Consumer.

So what is the way to utilize ConsumerRebalanceListener to be able to commit offsets to Kafka ?

Also, I am starting multiple consumer listener threads using ConcurrentKafkaListenerContainerFactory.setConcurrency() API, so if specific consumer thread dies, does it have its own instance of the ConsumerRebalanceListener?

Upvotes: 3

Views: 9236

Answers (1)

Gary Russell
Gary Russell

Reputation: 174779

The consumer rebalance listener cannot be used to commit offsets; with 1.x; the only way is via the Acknowledgment parameter.

1.1.0 is very old; all 1.x users are recommended to upgrade to at least 1.3.5; it has a much simpler threading model thanks to KIP-62 - see the project page.

The current version 2.1.6 (2.1.7 will be released today) has many more options, including ConsumerAwareMessageListener where you can get full access to the consumer.

You cannot interact directly with the consumer from the listener with versions < 1.3 because the consumer is not thread safe and the listener has to be invoked on a different thread.

Upvotes: 1

Related Questions