Reputation: 11923
I am trying to integrate RabbitMQ as a messaging queue in my existing microservice project. I currently have a Send
function written which takes a string message and publishes to a named queue.
Now, I am trying to write the Receive
function and here is what I have so far:
public static void Receive(string queueName)
{
using (IConnection connection = GetConnection(LOCALHOST))
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
// Don't dispatch a new message to a consumer until it has processed and acknowledged the previous one.
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel); // non-blocking
consumer.Received += (model, e) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
// At this point, I can do something with the message.
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
}
}
}
I think that is okay but I have a few questions on parts that I am confused on.
1) I don't quite understand what the Received
field is and why it is appending an anonymous function in which we do the actual work after receiving.
2) What is BasicConsume
doing? Does the actual receiving happen with BasicConsume
or the Received
field? Does BasicConsume
have to occur after the Received
field assign?
3) Finally, I have, say, two microservices that need to consume from a queue. I thought that I can just call Receive
in those two microservices respectively. Will it continually listen for messages or do I need to place the Receive
call in a while loop?
Thanks for your help and illumination.
Upvotes: 1
Views: 651
Reputation: 9195
1) Received
is actually an event. So, calling consumer.Received += (model, e) => {};
you're subscribing to it, but it's not necesseraly an anonimous function, it can be like:
consumer.Received += OnReceived;
....
private static void OnReceived(object model, BasicDeliverEventArgs e)
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
// At this point, I can do something with the message.
}
It is executed each time you recieve a message.
2) BasicConsume
starts a consumer of your channel, which you created before. The function in Recieved will be executed.
3) They will continually listen the channel in case of using EventingBasicConsumer
. It may require a loop for another type of Consumer
Upvotes: 3