Reputation: 2078
I'm trying to learn RabbitMQ and it's .NET api. I've setup two console apps, one is a publisher and one is a consumer. Here is the Publisher:
class Publisher
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() {HostName = "localhost"};
using var connection = factory.CreateConnection();
for (int i = 0; i < 100; i++)
{
Thread.Sleep(100);
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "My test Queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "My test Queue",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
And the Consumer:
class Consumer
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
var consumer = new EventingBasicConsumer(channel);
consumer.Received += async (model, ea) =>
{
Thread.Sleep(1000);
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "My test Queue",
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
So the Publisher is publishing 100 messages with a small delay (100ms) between each one. In the Consumer, I have a bigger delay, 1000ms. What I expected to see was the queue quickly fill up while the consumer chips away at it, handling one message per second until the queue is empty.
Instead, as soon as I run the consumer, the queue is instantly emptied, and in the console I see [x] Received {0}
logged every second.
I recorded a GIF where you can see that the queue is at 100, I launch the consumer, and then the queue is instantly at 0. But, you can see the consumer's action happening once every second.
Did the Consumer just completely empty the queue and is handling the messages one at a time off of memory? If so, how can I make the consumer not grab another item off the queue until the last action is done?
Upvotes: 0
Views: 2647
Reputation: 2020
Note: I'm not a RabitMQ expert at all, I've only experimented with it.
I think there are multiple ways of achieving this.
The smallest step from your example would probably be to turn off auto-acknowledgements (set autoAck: false
).
In automatic acknowledgement mode, a message is considered to be successfully delivered immediately after it is sent.
And
Another thing that's important to consider when using automatic acknowledgement mode is consumer overload. Manual acknowledgement mode is typically used with a bounded channel prefetch which limits the number of outstanding ("in progress") deliveries on a channel. With automatic acknowledgements, however, there is no such limit by definition. Consumers therefore can be overwhelmed by the rate of deliveries, potentially accumulating a backlog in memory and running out of heap or getting their process terminated by the OS. Some client libraries will apply TCP back pressure (stop reading from the socket until the backlog of unprocessed deliveries drops beyond a certain limit). Automatic acknowledgement mode is therefore only recommended for consumers that can process deliveries efficiently and at a steady rate.
Source: https://www.rabbitmq.com/confirms.html#acknowledgement-modes
You can read more about prefetching here: https://www.rabbitmq.com/confirms.html#channel-qos-prefetch
You'll then need to manually aknowledge the processing of the message:
// this example assumes an existing channel (IModel) instance
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body.ToArray();
// positively acknowledge a single delivery, the message will
// be discarded
channel.BasicAck(ea.DeliveryTag, false);
};
String consumerTag = channel.BasicConsume(queueName, false, consumer);
Source: https://www.rabbitmq.com/confirms.html#consumer-acks-api-elements
Upvotes: 1