Reputation: 553
scenario:
im setting autocommit false,
producing 12 messages
consuming them.. (say from offset 100)
shutting down consumer
start a new consumer
in that stage, I expect to the 2nd consumer to read all messages again starting offset 100 (because no committing was done)
but when producing new messages I see that the 2nd consumer starts from the new offset (113) i.e the commit still occurs somehow..
what do I get wrong?
that is my consumer code
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['192.168.14.10:9095']
});
const admin = kafka.admin();
const consumer = kafka.consumer({ groupId: 'test-group' });
const run = async () => {
// admin
await admin.connect();
// Consuming
await consumer.connect();
await consumer.subscribe({ topic: 'topic-test2'});
await consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString()
});
}
}
});
};
run().catch(console.error);
Upvotes: 3
Views: 1535
Reputation: 318
For manual commit, set eachBatchAutoResolve as false (this is true by default), autoCommit as false and provide autocommit configuration (autoCommitInterval, autoCommitThreshold or both).
const run = async () => {
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning: false })
await consumer.run({
eachBatchAutoResolve: false,
autoCommit: false,
autoCommitInterval: 5000,
autoCommitThreshold: 10,
eachBatch: async ({
batch,
resolveOffset,
heartbeat,
commitOffsetsIfNecessary,
uncommittedOffsets,
isRunning,
isStale,
pause,
}) => {
for (let message of batch.messages) {
let messageValue = JSON.parse(message.value);
resolveOffset(message.offset);
await commitOffsetsIfNecessary();
await heartbeat();
}
},
});
}
Upvotes: 0
Reputation: 30062
Autocommit only applies when processing messages within a batch. KafkaJS always does a commit at the end of processing a batch regardless of the autocommit setting. From the docs:
The messages are always fetched in batches from Kafka, even when using the eachMessage handler. All resolved offsets will be committed to Kafka after processing the whole batch.
Committing offsets periodically during a batch allows the consumer to recover from group rebalancing, stale metadata and other issues before it has completed the entire batch. However, committing more often increases network traffic and slows down processing. Auto-commit offers more flexibility when committing offsets;
And to add that recently I noticed that KafkaJS often consumes 1 message per batch, even if there are hundreds of messages waiting to be delivered. I haven't tracked this yet to see if this is the expected behaviour with Kafka or if it's an issue with the default KafkaJS documentation.
Upvotes: 0
Reputation: 318
In kafkajs when using autoCommit false use either commitOffsetsIfNecessary or consumer.commitOffsets.
Upvotes: 0
Reputation: 1
When a new consumer is spawned it will start consuming from fresh offset available it does not store historical data. If you want to start consuming from start for a consumer you can use this
const consumer = kafka.consumer({ groupId: 'your-group-id', autoOffsetReset: 'earliest', });
Upvotes: 0