Kenyon Rosewall
Kenyon Rosewall

Reputation: 73

Can I wait for a process to complete when consuming RabbitMQ messages with Node.js?

I'm pretty new to Node.js and ES6, and this is just confusing me a little bit. I am trying to leave a process running, consuming messages from a RabbitMQ queue. It needs to be able to process the message (which takes about 30-60 seconds) before it grabs the next message. Currently, the code I have, it grabs all messages it can and then tries to fork the processes. When there are 3-5 messages in the queue, this is fine, but for 20, 50 or 100 messages, this causes the server to run out of memory.

I have tried making the .consume() callback function async and adding await to the message processing function. I have tried wrapping an await new Promise within the .consume() callback around processMessage. I have tried adding await to the line that calls channel.consume. Nothing changes the behavior.

#!/usr/bin/env node

const amqp = require('amqplib');

const consumeFromQueue = async (queue, isNoAck = false, durable = false, prefetch = null) => {
    const conn_str = "amqp://" + process.env.RABBITMQ_USERNAME + ":" + process.env.RABBITMQ_PASSWORD + "@" + process.env.RABBITMQ_HOST + "/development?heartbeat=60"
    const cluster = await amqp.connect(conn_str);
    const channel = await cluster.createChannel();
    await channel.assertQueue(queue,  { durable: durable, autoDelete: true });
    if (prefetch) {
        channel.prefetch(prefetch);
    }
    console.log(` [x] Waiting for messages in ${queue}. To exit press CTRL+C`)

    try {
        channel.consume(queue, message => {
            if (message !== null) {
                console.log(' [x] Received', message.content.toString());
                processMessage(message.content.toString());
                channel.ack(message);
                return null;
            } else {
                console.log(error, 'Queue is empty!')
                channel.reject(message);
            }
        }, {noAck: isNoAck});
    } catch (error) {
        console.log(error, 'Failed to consume messages from Queue!')
        cluster.close(); 
    }
}

exports.consumeFromQueue = consumeFromQueue;

As a sidenote, if I create an array of strings and loop through the strings, when I add await to the processMessage line, it waits to execute process (30-60 seconds) before processing the next string.

(async () => {
    for (let i=0; i<urls.length; i++) {
        await processMessage(urls[i]);
    }
})();

So I basically need something that functions like this, but with listening to the queue in RabbitMQ.

Upvotes: 7

Views: 7997

Answers (3)

shkaper
shkaper

Reputation: 5008

If you want to limit the number of messages being processed by a consumer at any given time, use channel.prefetch():

The count given is the maximum number of messages sent over the channel that can be awaiting acknowledgement; once there are count messages outstanding, the server will not send more messages on this channel until one or more have been acknowledged.

That is, if you only want to be able process a single message at a time before moving on to the next, set channel.prefetch(1)

Upvotes: 11

CACHAC
CACHAC

Reputation: 107

If someone needs a complete answer:

You need to mix channel.prefetch(1), and { noAck: false } to consume one to one messages:

Simple Example:

const connection = await amqp.connect('amqp://localhost')    
const channel = await connection.createChannel()

await channel.assertQueue(queue, { durable: false })

// Set the number of messages to consume:
channel.prefetch(1)

await channel.consume(
      'QUEUE_NAME',
      async message => {
         if (message) {
            // YOUR ASYNC/AWAIT CODE

            // And then, ack the message manually:
            channel.ack(message)
         }
      },

      { noAck: false } // Set noAck to false to manually acknowledge messages
)

This is the way to consume a message at a time.

Upvotes: 1

Sheikh Rasel Ahmed
Sheikh Rasel Ahmed

Reputation: 139

moving on to the next, set channel.prefetch(1)

channel.prefetch(1)

Upvotes: 0

Related Questions