Reputation: 19
When app bootstraps, consumer is connected and can be verified from logs it's consuming the messages. But after a while it stops consuming messages as well as there is no broker connected with app address seen in consumer console.
Upvotes: 1
Views: 2164
Reputation: 84
example: if there are 3 partitions, use 3,6 or 9 consumers
Implement disconnect events, and inside try to connect with the broker and poll the messages
const consumer = new kafka.KafkaConsumer({
'group.id': 'test-consumer-group',
'metadata.broker.list': process.env.KAFKA_HOST,
'enable.auto.commit': false,
'enable.partition.eof': true
})
consumer.connect()
consumer.on('event.error', function (err) {
if (err == 'ETIMEDOUT') {
consumer.commit()
count = 0
}
consumer.disconnect()
logger('debug', {
message: "Error connecting to kafka consumer" + err
})
snmp_trap(1001)
})
consumer.on('disconnected', function (data) {
console.log("Disconnected. Reconnecting...");
consumer.connect();
});
consumer.on('ready', () => {
try {
consumer.subscribe([topicName]);
} catch (err) {
logger("debug", {
message: "error subscribing the topic: ",
topicName
})
}
try {
setInterval(() => {
consumer.consume(parseInt(process.env.POLL_SIZE || 100));
}, parseInt(process.env.POLL_INTERVAL || 1000))
} catch (err) {
logger("debug", {
message: "error starting consumer"
})
}
})
consumer.on('data', (data) => {
if (!data || !data.value) return
try {
let flag = offsetMap.get(`${data.topicName}${data.partition}${data.offset}`)
if (flag) {
logger("debug", `DUPLICATE OFFSET RECEIVED: ${flag}`)
consumer.commit({
offset: data.offset,
partition: data.partition,
topic: topicName
})
return
}
offsetMap.put(`${data.topicName}${data.partition}${data.offset}`, true)
logger("debug", {
message: "getting data" + data.value
})
msgQueue.push(data, function (err) {
// logger("debug", "error pushing data into queue")
})
logger("debug", {
message: `Current Queue size: ${msgQueue.length()}`
})
count++;
if (count > queueSize) {
consumer.commit({
offset: data.offset,
topic: topicName,
partition: data.partition
})
count = 0
}
} catch(err) {
console.log("ERR >>", err);
consumer.connect()
}
})
Upvotes: 2