yuria
yuria

Reputation: 553

how to make autocommit false work in kafkajs

scenario:
im setting autocommit false,
producing 12 messages
consuming them.. (say from offset 100)
shutting down consumer
start a new consumer

in that stage, I expect to the 2nd consumer to read all messages again starting offset 100 (because no committing was done)

but when producing new messages I see that the 2nd consumer starts from the new offset (113) i.e the commit still occurs somehow..

what do I get wrong?

that is my consumer code

const { Kafka } = require('kafkajs');
const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['192.168.14.10:9095']
});
const admin = kafka.admin();
const consumer = kafka.consumer({ groupId: 'test-group' });
const run = async () => {
  // admin
  await admin.connect();
  // Consuming
  await consumer.connect();
  await consumer.subscribe({ topic: 'topic-test2'});
  await consumer.run({
    autoCommit: false,
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString()
      });
      }
    }
  });
};
run().catch(console.error);

Upvotes: 3

Views: 1535

Answers (4)

ashish
ashish

Reputation: 318

For manual commit, set eachBatchAutoResolve as false (this is true by default), autoCommit as false and provide autocommit configuration (autoCommitInterval, autoCommitThreshold or both).

const run = async () => {
    await consumer.connect()
    await consumer.subscribe({ topic, fromBeginning: false })
    await consumer.run({
        eachBatchAutoResolve: false,
        autoCommit: false,
        autoCommitInterval: 5000,
        autoCommitThreshold: 10,
        eachBatch: async ({
            batch,
            resolveOffset,
            heartbeat,
            commitOffsetsIfNecessary,
            uncommittedOffsets,
            isRunning,
            isStale,
            pause,
        }) => {
            for (let message of batch.messages) {
                let messageValue = JSON.parse(message.value);
                resolveOffset(message.offset);
                await commitOffsetsIfNecessary();
                await heartbeat();
            }
        },
    });
}

Upvotes: 0

Augusto
Augusto

Reputation: 30062

Autocommit only applies when processing messages within a batch. KafkaJS always does a commit at the end of processing a batch regardless of the autocommit setting. From the docs:

The messages are always fetched in batches from Kafka, even when using the eachMessage handler. All resolved offsets will be committed to Kafka after processing the whole batch.

Committing offsets periodically during a batch allows the consumer to recover from group rebalancing, stale metadata and other issues before it has completed the entire batch. However, committing more often increases network traffic and slows down processing. Auto-commit offers more flexibility when committing offsets;

And to add that recently I noticed that KafkaJS often consumes 1 message per batch, even if there are hundreds of messages waiting to be delivered. I haven't tracked this yet to see if this is the expected behaviour with Kafka or if it's an issue with the default KafkaJS documentation.

Upvotes: 0

ashish
ashish

Reputation: 318

In kafkajs when using autoCommit false use either commitOffsetsIfNecessary or consumer.commitOffsets.

doc

kafka js manual-committing

Upvotes: 0

anmol kumar
anmol kumar

Reputation: 1

When a new consumer is spawned it will start consuming from fresh offset available it does not store historical data. If you want to start consuming from start for a consumer you can use this

const consumer = kafka.consumer({ groupId: 'your-group-id', autoOffsetReset: 'earliest', });

Upvotes: 0

Related Questions