noblerare
noblerare

Reputation: 11923

RabbitMQ - Consuming messages from queue in microservices

I am trying to integrate RabbitMQ as a messaging queue in my existing microservice project. I currently have a Send function written which takes a string message and publishes to a named queue.

Now, I am trying to write the Receive function and here is what I have so far:

public static void Receive(string queueName)
{
    using (IConnection connection = GetConnection(LOCALHOST))
    {
        using (IModel channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

            // Don't dispatch a new message to a consumer until it has processed and acknowledged the previous one.
            channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); 

            var consumer = new EventingBasicConsumer(channel); // non-blocking

            consumer.Received += (model, e) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                // At this point, I can do something with the message.
            };

            channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
        }
    }
}

I think that is okay but I have a few questions on parts that I am confused on.

1) I don't quite understand what the Received field is and why it is appending an anonymous function in which we do the actual work after receiving.

2) What is BasicConsume doing? Does the actual receiving happen with BasicConsume or the Received field? Does BasicConsume have to occur after the Received field assign?

3) Finally, I have, say, two microservices that need to consume from a queue. I thought that I can just call Receive in those two microservices respectively. Will it continually listen for messages or do I need to place the Receive call in a while loop?

Thanks for your help and illumination.

Upvotes: 1

Views: 651

Answers (1)

Alex Riabov
Alex Riabov

Reputation: 9195

1) Received is actually an event. So, calling consumer.Received += (model, e) => {}; you're subscribing to it, but it's not necesseraly an anonimous function, it can be like:

consumer.Received += OnReceived;

....

private static void OnReceived(object model, BasicDeliverEventArgs e)
{
    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);
    // At this point, I can do something with the message.
}

It is executed each time you recieve a message.

2) BasicConsume starts a consumer of your channel, which you created before. The function in Recieved will be executed.

3) They will continually listen the channel in case of using EventingBasicConsumer. It may require a loop for another type of Consumer

Upvotes: 3

Related Questions