sash
sash

Reputation: 1274

Spring kafka consumer, seek offset at runtime?

I am using the KafkaMessageListenerContainer for consuming from the kafka topic, I have an application logic to process each record which is dependent on other micro services as well. I am now manually committing the offset after each record is processed.

But if I the application logic fails I need to seek to the failed offset and keep processing it until it's succeeds. For that I need to do a run time manual seek of the last offset.

Is this possible with the KafkaMessageListenerContainer yet ?

Upvotes: 7

Views: 11450

Answers (2)

Valerie
Valerie

Reputation: 19

please refer to Spring Kafka document

"For an existing group ID, the initial offset is the current offset for that group ID".

And Confluent also mentioned: "If the consumer crashes or is shut down, its partitions will be re-assigned to another member, which will begin consumption from the last committed offset of each partition"

It means, in your case, when the consumer resumes, it will continue at the last committed offset. You also do not need to "seek to the failed offset" if you commit manually by

ENABLE_AUTO_COMMIT_CONFIG = False

and

factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE)

Upvotes: -1

Gary Russell
Gary Russell

Reputation: 174739

See Seeking to a Specific Offset.

In order to seek, your listener must implement ConsumerSeekAware which has the following methods:

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

The first is called when the container is started; this callback should be used when seeking at some arbitrary time after initialization. You should save a reference to the callback; if you are using the same listener in multiple containers (or in a ConcurrentMessageListenerContainer) you should store the callback in a ThreadLocal or some other structure keyed by the listener Thread.

Upvotes: 9

Related Questions