Andrio
Andrio

Reputation: 2078

Why is my RabbitMQ consumer (seemingly) consuming all messages at once?

I'm trying to learn RabbitMQ and it's .NET api. I've setup two console apps, one is a publisher and one is a consumer. Here is the Publisher:

class Publisher
{
    static void Main(string[] args)
    {
        var factory = new ConnectionFactory() {HostName = "localhost"};
        using var connection = factory.CreateConnection();
        for (int i = 0; i < 100; i++)
        {
            Thread.Sleep(100);
            using var channel = connection.CreateModel();
            channel.QueueDeclare(queue: "My test Queue",
                durable: false,
                exclusive: false,
                autoDelete: false,
                arguments: null);

            string message = "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);

            channel.BasicPublish(exchange: "",
                routingKey: "My test Queue",
                basicProperties: null,
                body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}

And the Consumer:

class Consumer
{
    static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
        
        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += async (model, ea) =>
        {
            Thread.Sleep(1000);
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            Console.WriteLine(" [x] Received {0}", message);
        };
        channel.BasicConsume(queue: "My test Queue",
            autoAck: true,
            consumer: consumer);
        
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}

So the Publisher is publishing 100 messages with a small delay (100ms) between each one. In the Consumer, I have a bigger delay, 1000ms. What I expected to see was the queue quickly fill up while the consumer chips away at it, handling one message per second until the queue is empty.

Instead, as soon as I run the consumer, the queue is instantly emptied, and in the console I see [x] Received {0} logged every second.

I recorded a GIF where you can see that the queue is at 100, I launch the consumer, and then the queue is instantly at 0. But, you can see the consumer's action happening once every second.

animated gif of queue being instantly emptied by consumer

Did the Consumer just completely empty the queue and is handling the messages one at a time off of memory? If so, how can I make the consumer not grab another item off the queue until the last action is done?

Upvotes: 0

Views: 2647

Answers (1)

Stanislas
Stanislas

Reputation: 2020

Note: I'm not a RabitMQ expert at all, I've only experimented with it.

I think there are multiple ways of achieving this.

Manual acknowledgement

The smallest step from your example would probably be to turn off auto-acknowledgements (set autoAck: false).

In automatic acknowledgement mode, a message is considered to be successfully delivered immediately after it is sent.

And

Another thing that's important to consider when using automatic acknowledgement mode is consumer overload. Manual acknowledgement mode is typically used with a bounded channel prefetch which limits the number of outstanding ("in progress") deliveries on a channel. With automatic acknowledgements, however, there is no such limit by definition. Consumers therefore can be overwhelmed by the rate of deliveries, potentially accumulating a backlog in memory and running out of heap or getting their process terminated by the OS. Some client libraries will apply TCP back pressure (stop reading from the socket until the backlog of unprocessed deliveries drops beyond a certain limit). Automatic acknowledgement mode is therefore only recommended for consumers that can process deliveries efficiently and at a steady rate.

Source: https://www.rabbitmq.com/confirms.html#acknowledgement-modes
You can read more about prefetching here: https://www.rabbitmq.com/confirms.html#channel-qos-prefetch

You'll then need to manually aknowledge the processing of the message:

// this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
                {
                    var body = ea.Body.ToArray();
                    // positively acknowledge a single delivery, the message will
                    // be discarded
                    channel.BasicAck(ea.DeliveryTag, false);
                };
String consumerTag = channel.BasicConsume(queueName, false, consumer);

Source: https://www.rabbitmq.com/confirms.html#consumer-acks-api-elements

Alternatives

Upvotes: 1

Related Questions