i474
i474

Reputation: 664

Get all messages from rabbitMQ

I'm trying to get all messages from rabbitMQ queue.

const messages = await rabbit.getMessages(outputQueue, false);

Here is realization of getMessages method. The problem is it processes only 3-5 messages and calling 'resolve'. Some time later it processes rest messages, but 'resolve' has already been called and it can't be executed again.

const amqp = require('amqplib');
.
.
let amqpUrl;
let queueConf;

const init = (connection, queue) => {
  amqpUrl =`amqp://${connection.user}:${connection.password}@${connection.host}:${connectio    n.port}`;
  if (connection.vhost) {
amqpUrl = `amqp://${connection.user}:${connection.password}@${connection.host}:${connection.port}/${connection.vhost}`;
  }
  queueConf = queue;
}

const getChannel = () => new Promise((resolve) => {
  amqp.connect(amqpUrl).then((conn) => {
    conn.createChannel().then((ch) => {
      ch.prefetch(1000).then(() => resolve(ch))
    })
  })
})

module.exports = (connection, queue) => {
  init(connection, queue);
  return {
    getMessages: (queueName, cleanQueue) => new Promise((resolve) => {
      let messages = [];
      let i = 1;
      getChannel().then((ch) => {
        ch.consume(queueName, (msg) => {
          messages.push(msg);
          console.log(msg.content.toString())
        }, { noAck: cleanQueue }).then(() => {
          logger.info(`Retreived ${messages.length} messages from ${queueName}`);
          resolve(messages)
        })
      })
    })
    .
    .
    };
  };

Thanks in advance !

Upvotes: 0

Views: 7706

Answers (1)

idbehold
idbehold

Reputation: 17168

You can do it like this but it will be very slow and will possibly never resolve if messages are added to the queue faster than you can consume them. Essentially you keep getting one message at a time until channel.get() resolves with false instead of a message object:

getMessages: (queueName, cleanQueue) => {
  let messages = []
  let i = 1
  return getChannel().then(function getMessage (ch) {
    return ch.get(queueName, { noAck: cleanQueue }).then((msg) => {
      if (msg) {
        messages.push(msg)
        return getMessage(ch)
      } else {
        logger.info(`Retrieved ${messages.length} messages from ${queueName}`)
        return messages
      }
    })
  }).catch((err) => {
    err.consumedMessages = messages
    return Promise.reject(err)
  })
}

Upvotes: 1

Related Questions