Reputation: 149
I want to manually commit messages when all tasks are finished(like pushing message into database ), in my consumer group. How can I disable auto commit and commit messages manually.
Upvotes: 2
Views: 7064
Reputation: 2751
Set autoCommit false
const kafka = require('kafka-node');
const config = require('../config');
const client = new kafka.KafkaClient({
kafkaHost: config.kafkaHost
});
const consumer = new kafka.Consumer(client, [
{
topic: config.kafkaTopic
}
], {
autoCommit: false
});
Then commit manually-
consumer.on('message', (message) => {
console.log('message', message);
// feed data into db
consumer.commit((error, data) => {
if (error) {
console.error(error);
} else {
console.log('Commit success: ', data);
}
});
});
Upvotes: 2
Reputation: 621
Use node-rdkafka instead. It works very well . you can commit a message like
consumer.commit({ topic: data.topic, partition: data.partition, offset: data.offset + 1 }, function(err, data) {})
Upvotes: 0