Reputation: 43
Greeting guys.
Can you help me with asynchronnous in node.js?
Problem this:
I'm use amqplib module for work with RabbitMQ and here there method consume, who gives messages from RabbitMQ, but that method first return promise about he starts and after this promise starts, he call callbacks to get data from RabbitMQ, and i dont know how to catch when all messages will be send to my node js app.
For more explain, here my code and at end code at comments i wrote what i want:
/**
* Here my test code
*
* requirng amqp.node lib
*/
let amqp = require('amqplib')
, configConnection = { /* ..config options */ }
, queue = 'users'
, exchange = 'users.exchange'
, type = 'fanout'
/**
* declare annonymous function as async and immediately called it
*/
(async () => {
/**
* declare connection and channel with using async/await construction
* who support version node.js >= 8.5.0
*/
let conn = await amqp.connect(configConnection)
let channel = await conn.createChannel()
await channel.assertExchange(exchange, type)
let response = await channel.assertQueue(queue)
/**
* response: { queue: 'users', messageCount: 10, consumerCount: 0 }
*/
response = await channel.bindQueue(response.queue, exchange, '')
response = await channel.consume(response.queue, logMessage, {noAck: false})
/**
* {noAck: false} false for not expect an acknowledgement
*/
console.log('reading for query finish')
function logMessage(msg) {
console.log("[*] recieved: '%s'", msg.content.toString())
}
})()
/**
* output will show:
* reading for query finish
* [*] recieved: 'message content'
* [*] recieved: 'message content'
* [*] recieved: 'message content'
* ...
*
* But i'm need show message 'reading for query finish' after when
* all consumes will executed
*
* Ask: How i can do this?
*/
Upvotes: 2
Views: 4787
Reputation: 1
maybe it would be helpful.
This is the best way to get all messages from queue for me:
Step 1: Getting messageCount
const queueInfo = await channel.checkQueue(queue);
Step 2: Pushing messages to array
let messages: Message[] = [];
await channel.consume(queue, async (msg: Message) => {
messages.push(msg);
});
Step 3: waiting for finish using while loop
while (messages.length != queueInfo.messageCount) {
await delay(100);
}
Optional: this is my code for waiting
const delay = (ms: number) => {
return new Promise(resolve => setTimeout(resolve, ms));
}
Upvotes: 0
Reputation: 1
Try the below code:
let amqp = require('amqplib/callback_api');
function init() {
let configConnection = {
protocol: 'amqp',
hostname: 'localhost',
port: 5672,
username: 'root',
password: '1111',
heartbeat: 60,
};
let messageCount = 0;
let messageCounter = 0;
let queue_name = 'queue_name';
let messages = [];
amqp.connect(configConnection, function (error, connect) {
if (error) {
throw error;
}
// Create channel and get info about queue
connect.createChannel(function (error1, channel) {
if (error1) {
throw error1;
}
channel.assertQueue(queue_name, {durable: true}, (error2, result) => {
// here you get count of messages
messageCount = result.messageCount;
// Consume to queue
channel.consume(queue_name, function (msg) {
msg = msg.content.toString();
messages.push(msg);
// Show all messages and exit
if (messageCount === ++messageCounter) {
console.log(messages);
process.exit();
}
}, {
noAck: true
});
});
});
});
}
init();
Upvotes: 0
Reputation: 43
I found answer on my question here.
Answer in use: EventEmitter && Promise
magic (for me) is here:
await new Promise(resolve => eventEmitter.once('consumeDone', resolve))
So ended code is:
/**
* Here my test code
*
* requirng amqp.node lib
*/
let amqp = require('amqplib')
, EventEmitter = require('events')
, eventEmitter = new EventEmitter()
, timeout = 10000
, configConnection = { /* ..config options */ }
, queue = 'users'
, exchange = 'users.exchange'
, type = 'fanout'
/**
* declare annonymous function as async and immediately called it
*/
(async () => {
/**
* declare connection and channel with using async/await construction
* who support version node.js >= 8.5.0
*/
let conn = await amqp.connect(configConnection)
let channel = await conn.createChannel()
await channel.assertExchange(exchange, type)
let response = await channel.assertQueue(queue)
/**
* response: { queue: 'users', messageCount: 10, consumerCount: 0 }
*/
let messageCount = response.messageCount
response = await channel.bindQueue(response.queue, exchange, '')
response = await channel.consume(response.queue, logMessage(messageCount), {noAck: false})
/**
* {noAck: false} false for not expect an acknowledgement
*/
/**
* declare timeout if we have problems with emit event in consume
* we waiting when event will be emit once 'consumeDone' and promise gain resolve
* so we can go to the next step
*/
setTimeout(() => eventEmitter.emit('consumeDone'), timeout)
await new Promise(resolve => eventEmitter.once('consumeDone', resolve))
console.log('reading for query finish')
function logMessage(messageCount) {
return msg => {
console.log("[*] recieved: '%s'", msg.content.toString())
if (messageCount == msg.fields.deliveryTag) {
eventEmitter.emit('consumeDone')
}
}
}
})()
Upvotes: 2