Noé Malzieu
Noé Malzieu

Reputation: 2600

AMQPlib nodejs consumer task concurrency

I'm building a background task management system with rabbitmq and nodejs using the amqlib module.

Some of the tasks are really CPU-consuming, so if I'm launching a lot of them and I have only a few workers up, my server can get killed (using too much CPU).

I'm wondering if there is a way to create an amqp queue so that my consumers will only consume one task of this queue at a time (i.e. Before an ack or a reject, do not send a task of this kind to this consumer). Or should I handle this myself in the code (maybe keeping a reference in my worker that I'm handling a task of this queue and rejecting all tasks of this queue while I'm executing the task ?).

Here is my sample code :

I'm creating the amqp connection like that

const amqpConn = require('amqplib').connect('amqp://localhost');

My queue name is tasks :

amqpConn.then((conn) => {
  return conn.createChannel();
}).then((ch) => {
  return ch.assertQueue('tasks').then((ok) => {
    ch.sendToQueue(q, new Buffer(`something to do ${i}`));
  });
}).catch(console.warn);

And here is my consumer (I guess this is where I should do the work to limit only one concurrent task of this queue) :

amqpConn.then((conn) => {
  return conn.createChannel();
}).then((ch) => {
  return ch.assertQueue('tasks').then((ok) => {
    return ch.consume('tasks', (msg) => {
      if (msg !== null) {
        console.log(msg.content.toString());
        ch.ack(msg);
      }
    });
  });
}).catch(console.warn);

Thanks a lot !

Upvotes: 2

Views: 2473

Answers (2)

Noé Malzieu
Noé Malzieu

Reputation: 2600

I think I got it going by :

  • creating a Channel per queue
  • using the prefetch_count of the channel to limit the concurrency on a per-consumer basis

https://www.rabbitmq.com/consumer-prefetch.html

Upvotes: 1

cantSleepNow
cantSleepNow

Reputation: 10192

I'm wondering if there is a way to create an amqp queue so that my consumers will only consume one task of this queue at a time

If this is what you really need then yes, simply have exactly one consumer and declare the queue exclusive. In that way one tasks is consumed at the time.

Upvotes: 1

Related Questions