Reputation: 41
I'm working on a application that uses KafkaJS for message processing. Currently, when an error occurs during message processing, I use process.exit to terminate the process. However, this approach stops the whole process and the service wont restart itself to try to handle that message again or skips the remaining messages and doesn't retry the failed message before committing the offset.
I'm looking for a way to implement a retry mechanism in my KafkaJS consumer. The goal is to retry processing a message a certain number of times before committing its offset. If all retries fail, I would then handle the error accordingly (probably by loggging).
This is an example.
const { Kafka } = require('kafkajs');
const kafka = new Kafka({ /* Kafka configuration */ });
const consumer = kafka.consumer({ /* Consumer configuration */ });
consumer.subscribe({ /* Topic configuration */ });
await this.consumer.run({
eachMessage: async ({ message }) => {
try {
// Message processing logic
} catch (e: unknown) {
logger.info(e);
await this.consumer.disconnect();
process.exit();
}
},
});
Ive tried with catching the NumberOfRetriesExceed kafkajs error but that is only for specific kafkajs errors and not application related errors.
I think the best way is maybe to implement a DLQ but I'm not sure.
Upvotes: 0
Views: 1349
Reputation: 318
Kafka js has internal retry mechanism. Above the default retry, one can implement custom retry. Refer https://kafka.js.org/docs/consumer-example and error handling strategies.
const run = async () => {
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const prefix = `${topic}[${partition} | ${message.offset}] / ${message.timestamp}`
console.log(`- ${prefix} ${message.key}#${message.value}`)
},
})
}
//Event handlers
const errorTypes = ['unhandledRejection', 'uncaughtException']
errorTypes.forEach(type => {
process.on(type, async e => {
try {
console.log(`process.on ${type}`)
console.error(e)
// Disconnect the consumer
await consumer.disconnect();
} catch (_) {
log.error
}
//One can choose to exit the process or call run method using any custom logic
run();
})
})
//https://kafka.js.org/docs/1.11.0/instrumentation-events
//kafka consumer event handlers
consumer.on('consumer.crash',()=>{
/*. If your application wants to react to the error, such as by cleanly shutting down resources,restarting the consumer itself, or exiting the process entirely, it should listen to the CRASH event.*/
try {
// Disconnect the consumer
await consumer.disconnect();
} catch (_) {
log.error
}
//One can choose to exit the process or call run method using any custom logic
run();
})
Upvotes: 0