Trayambak Kumar
Trayambak Kumar

Reputation: 143

Is there a way to reset offsets of a Kafka consumer group through an API?

I have a use case where there is a consumer group that is consuming messages. I want to build an API to modify its offset. So, when the endpoint is called with the offset I have to change the offset of the consumer group. I am using SpringBoot and the consumer is built using Spring Kafka. Thanks in advance.

Upvotes: 3

Views: 12132

Answers (2)

DDT
DDT

Reputation: 414

To reset an offset, the code has to get the consumer by group id, get started, and stopped. A client has to be created (while consumer is stopped). Once the client has been created, the code has to connect, reset the offset, and then disconnect (as posted below). Once it has reset the consumer group has to be restarted again.

The code below below resets the offset. The key is to get the consumer instance to be able to stop and start the consumers because it cannot reset if a consumer is running. We have this running from the command line which is nice. Hopefully this helps.

return kafka.getConsumer( args.groupId )
    .then(consumerInstance => {
        consumer = consumerInstance;
        return consumer.run();
    }).then(()=> {
        return consumer.stop();
    }).then(() => {
         // reset kafka client offset
    })).then(()=> {
        return consumer.run();
    }).

// reset kafka client offset
const { groupId, topic } = args;

const admin = createKafkaClient().admin();

return admin.connect()
    .then(() =>
        admin.resetOffsets({
            groupId,
            topic,
            earliest: true,
    })
    .then(() => admin.disconnect();

Upvotes: 0

Gary Russell
Gary Russell

Reputation: 174699

Spring for Apache Kafka provides some convenience mechanisms for performing seeks, either during application initialization, or at any time thereafter.

The simplest is to have your listener extend AbstractConsumerSeekAware or implement ConsumerSeekAware.

Upvotes: 0

Related Questions