Reputation: 2528
Its states the following in the RabbitMQ documentatation
"As a rule of thumb, sharing Channel instances between threads is something to be avoided. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads."
Currently we are looking at the prefetch count where it has been recommended that if you have a small number of consumers and autoack=false, then we should consume many messages at once. However we find that the prefetch has not effect if the consumer sends manual acknowledgements back using a single thread of execution. However if we wrap the consumer processing in a Task, we find that the prefetch count does matter and substantially improves consumer performance.
See the following example where we are wrapping the consumption of the message by the consumer in a Task object:
class Program
{
public static void Main()
{
var factory = new ConnectionFactory()
{
HostName = "172.20.20.13",
UserName = "billy",
Password = "guest",
Port = 5671,
VirtualHost = "/",
Ssl = new SslOption
{
Enabled = true,
ServerName = "rabbit.blah.com",
Version = System.Security.Authentication.SslProtocols.Tls12
}
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.BasicQos(0, 100, false);
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
var queueName = channel.QueueDeclare().QueueName;
Console.WriteLine(" [*] Waiting for logs.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var _result = new Task(() => {
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
System.Threading.Thread.Sleep(80);
channel.BasicAck(ea.DeliveryTag, false);
});
_result.Start();
};
channel.BasicConsume(queue: "test.queue.1", autoAck: false, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
The question I have is how do people implement consumers with the .NET rabbitmq client that avail of prefetch counts?, Do you have to manual ack using a Task of some sort?, is it safe
Upvotes: 4
Views: 9402
Reputation: 9637
The documentation to which you refer is for the Java client. You should be referring to this document instead.
You're using the latest version of the .NET client (5.1
), so doing your work in the Received
event handler won't block other threads that deal with TCP data and it won't block heartbeats - both of which are good.
First of all, calling channel.BasicQos(0, 1, false)
means your consumer will only receive one ready message at a time from RabbitMQ, and until BasicAck
is called another message won't be delivered. So, there's really no reason to do your work in another thread since you're not going to get another message anyway.
If you increase the prefetch value (through experimentation and running benchmarks), you will have to do your work in a background thread if your work takes more than a few milliseconds to run.
When you do your work in the Received
event callback, it's going to block the thread that is used to make that callback since the callback is not executed on its own thread. So, you can either ensure your work is very short, or do the work in another thread.
I just spent some time reviewing the .NET client code and I'm pretty sure the IModel
instance is not thread safe. If you increase prefetch, you will have the opportunity to acknowledge several messages at the same time, so I recommend implementing a solution that uses that and also ensures that BasicAck
is called on the same thread on which the connection is created.
Upvotes: 6
Reputation: 323
Source: https://www.rabbitmq.com/api-guide.html
When manual acknowledgements are used, it is important to consider what thread does the acknowledgement. If it's different from the thread that received the delivery (e.g. Consumer#handleDelivery delegated delivery handling to a different thread), acknowledging with the multiple parameter set to true is unsafe and will result in double-acknowledgements, and therefore a channel-level protocol exception that closes the channel. Acknowledging a single message at a time can be safe.
channel.basicAck(tag, false)
is thread safe
but consumerChannel.basicAck(tag, true)
is not.
Also some good points mentioned at RabbitMQ and channels Java thread safety
Upvotes: 1