Reputation: 427
In one of my use-case consist of consuming the data, do some operations and produce it to new topic.
I'm using https://www.npmjs.com/package/kafkajs npm library.
I would like to commit the offset manually after successful operations to avoid any data loss. I'm using autoCommit: false
to avoid data auto committing after consuming.
This is the code to commit offset manually
consumer.commitOffsets([
{ topic: 'topic-A', partition: 0, offset: '1' }
])
As I read somewhere that if we commit each offset intentionally (commit offset immediately after cosuming) then it will create load on brokers and its not good to do.
I need kafka expert advise to suggest the best approach on my above use case to avoid any data loss? please advise
Upvotes: 6
Views: 6431
Reputation: 435
in order to handle commit manually below is the code.
await consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }) => {
...
await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }]);
},
});
Upvotes: 4