ItayB
ItayB

Reputation: 11357

kafka-node start consume from last offset

I'm using kafka-node to consume messages from a specific Kafka topic. When I restart my node server, it init my consumer as expected, but it's default behavior is to start consume from offset 0 while my goal is to receive only new messages (aka start consume from current offset). I didn't find a way to achieve that from the API documentation. Anyone knows if its supported?

Thanks!

Upvotes: 6

Views: 13177

Answers (3)

Vincent van Dam
Vincent van Dam

Reputation: 21

Similar answer; this will retrieve all offsets for each partition and will set the offset to the highest value, minus 1, to consume the last published message for given topic.

var offset = new kafka.Offset(client)
offset.fetchLatestOffsets([topic], (err, offsets) => {
    if (err) {
        console.log(`error fetching latest offsets ${err}`)
        return
    }
    var latest = 1
    Object.keys(offsets[topic]).forEach( o => {
        latest = offsets[topic][o] > latest ? offsets[topic][o] : latest
    })
    consumer.setOffset(topic, 0, latest-1)
})

Upvotes: 2

ItayB
ItayB

Reputation: 11357

I asked this question in kafka-node github issues (link) and got an answer. It is now available (from v0.4.0). The following snippet worked for me:

consumerClient = new kafka.Client('localhost:2181');

/* Print latest offset. */
var offset = new kafka.Offset(consumerClient);

offset.fetch([{ topic: 'myTopic', partition: 0, time: -1 }], function (err, data) {
        var latestOffset = data['myTopic']['0'][0];
        console.log("Consumer current offset: " + latestOffset);
});

var consumer = new kafka.HighLevelConsumer(
        consumerClient,
        [
            { topic: 'myTopic', partition: 0, fromOffset: -1 }
        ],
        {
            autoCommit: false
        }
);

Cheers!

Upvotes: 8

Hussain
Hussain

Reputation: 1015

If you wants to receive only new messages, you have to set the following property before the creating consumer instance: auto.offset.reset=latest

Upvotes: -1

Related Questions