Hammerbot
Hammerbot

Reputation: 16364

NodeJS and RabbitMQ, how to be sure my message is processed

I am building a kind of micro service application and using RabbitMQ to communicate between my services.

I have a nodeJS app that is supposed to receive messages from RabbitMQ and execute commands when a particular message comes in. So here is what the following code does:

  1. Connects to RabbitMQ
  2. Listens to symfony_messages queue
  3. If a message identified by product.created comes in, the script executes a particular command using spawn from child_process.

My question is: Sometimes, I am going to "restart" my script. How can I be sure that at the moment of restarting the script is not in the middle of processing an event? How can I be sure that the process is not going to consume a message and stop before spawning the process?

The possible solution that came to my mind is:

And here is the code (you do not need to read if you already get the question):

const amqp = require('amqplib/callback_api')
const { spawn } = require('child_process')

amqp.connect('amqp://guest:[email protected]:5672', (err, conn) => {

    if (err) {
        console.log(err)
        return
    }

    conn.createChannel((err, channel) => {
        let q = 'symfony_messages'

        channel.assertQueue(q, {
            durable: false
        })

        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);

        channel.consume(q, (msg) => {
            let event = JSON.parse(msg.content.toString())

            if (event.name === 'product.created') {
                console.log('Indexing order...')

                let cp = spawn('php', [path.join(__dirname, '..', '..', 'bin', 'console'), 'elastic:index:orders', event.payload.product_id])

                cp.stdout.on('data', (data) => {
                    console.log(`stdout: ${data}`);
                })

                cp.stderr.on('data', (data) => {
                    console.log(`stderr: ${data}`);
                })

                cp.on('close', (code) => {
                    console.log(`child process exited with code ${code}`);
                })
            }

        }, {noAck: true});
    })
})

Upvotes: 1

Views: 1885

Answers (1)

Terry Lennox
Terry Lennox

Reputation: 30725

Wouldn't it be a good pattern to use the channel.ack(message) function on the message once the message has been processed successfully? You've set the noAck option to true, but you can use the ACK mechanism to ensure messages are only taken off the queue once they are successfully processed.

Likewise, you can use the Nack function to deliberately tell RabbitMQ that the message was not processed, I normally do this in the process function error handler (or promise.catch).

I use a similar mechanism in a service that writes messages to a database. I only ACK the message once the message is written to the db. It's also useful to setup a dead letter exchange / queue within RabbitMQ so that any message that is Nacked ends up there. You can then inspect these messages and see why they couldn't be processed (or automatically attempt to re-process once the error condition that caused the problem is resolved.)

Upvotes: 2

Related Questions