devil
devil

Reputation: 19

node-rdkafka issue: Consumer gets disconnected after a while

When app bootstraps, consumer is connected and can be verified from logs it's consuming the messages. But after a while it stops consuming messages as well as there is no broker connected with app address seen in consumer console.

Upvotes: 1

Views: 2164

Answers (1)

Ravi Ranjan
Ravi Ranjan

Reputation: 84

There can be few things to keep in mind while using kafka:-

1. Always use the consumers in multiple of partitions.

example: if there are 3 partitions, use 3,6  or 9 consumers

2. Keep the logic of reconnect in app

Implement disconnect events, and inside try to connect with the broker and poll the messages

const consumer = new kafka.KafkaConsumer({
    'group.id': 'test-consumer-group',
    'metadata.broker.list': process.env.KAFKA_HOST,
    'enable.auto.commit': false,
    'enable.partition.eof': true
})

consumer.connect()

consumer.on('event.error', function (err) {
    if (err == 'ETIMEDOUT') {
        consumer.commit()
        count = 0
    }

    consumer.disconnect()
    logger('debug', {
        message: "Error connecting to kafka consumer" + err
    })
    snmp_trap(1001)
})

consumer.on('disconnected', function (data) {
    console.log("Disconnected. Reconnecting...");
    consumer.connect();
});


consumer.on('ready', () => {
    try {
        consumer.subscribe([topicName]);
    } catch (err) {
        logger("debug", {
            message: "error subscribing the topic: ",
            topicName
        })
    }

    try {

        setInterval(() => {
            consumer.consume(parseInt(process.env.POLL_SIZE || 100));
        }, parseInt(process.env.POLL_INTERVAL || 1000))

    } catch (err) {
        logger("debug", {
            message: "error starting consumer"
        })
    }
})


consumer.on('data', (data) => {
    if (!data || !data.value) return
    
    try {
        let flag = offsetMap.get(`${data.topicName}${data.partition}${data.offset}`)
        if (flag) {
            logger("debug", `DUPLICATE OFFSET RECEIVED: ${flag}`)
            consumer.commit({
                offset: data.offset,
                partition: data.partition,
                topic: topicName
            })
            return
        }
    
        offsetMap.put(`${data.topicName}${data.partition}${data.offset}`, true)
        logger("debug", {
            message: "getting data" + data.value
        })
        msgQueue.push(data, function (err) {
            // logger("debug", "error pushing data into queue")
        })
    
        logger("debug", {
            message: `Current Queue size: ${msgQueue.length()}`
        })
    
        count++;
        if (count > queueSize) {
            consumer.commit({
                offset: data.offset,
                topic: topicName,
                partition: data.partition
            })
    
            count = 0
        }
    } catch(err) {
        console.log("ERR >>", err);
        consumer.connect()
    }
    
})

Upvotes: 2

Related Questions