Reputation: 151
I have been doing a small sample application to consume messages on a queue in RabbitMQ.
The code should read the message an call a REST API (here replaced with a Task.Delay
):
static void Main(string[] args)
{
var factory = new ConnectionFactory
{
Uri = new Uri("..."),
DispatchConsumersAsync = true
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += async (model, eventArgs) =>
{
Console.WriteLine("Doing a fake API call...");
await Task.Delay(2000);
Console.WriteLine("Done with fake API call!");
channel.BasicAck(eventArgs.DeliveryTag, false);
};
channel.BasicConsume("myQueue", false, consumer);
}
When I run this application with five messages on the queue I get the following result:
The messages are processed one by one and with the 2 second delay it takes ~10 seconds.
I would have expected to see five lines with Doing a fake API call...
followed by five lines of Done with fake API call!
with a total time of ~2 seconds.
When doing the synchronous version I see the exact same result - which was expected:
static void Main(string[] args)
{
var factory = new ConnectionFactory
{
Uri = new Uri("...")
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, eventArgs) =>
{
Console.WriteLine("Doing a fake API call...");
Thread.Sleep(2000);
Console.WriteLine("Done with fake API call!");
channel.BasicAck(eventArgs.DeliveryTag, false);
};
channel.BasicConsume("myQueue", false, consumer);
}
My question is: What is the difference in using the AsyncEventingBasicConsumer
compared to the EventingBasicConsumer
?
And: Is there a way of making the consumer process other messages while awaiting work for previous messages?
Upvotes: 11
Views: 4311
Reputation: 111
This is an old question and I'm sure you're not still waiting for an answer but I've found that it can be challenging to really nail down the details on how RabbitMQ behaves. As such, hoping at least some information here will help someone in the future.
RMQ EventingBasicConsumer
and AsyncEventingBasicConsumer
differ mostly in their signatures, so we can attach async
handlers to events like Received
and ConsumerCancelled
. If you switch back and forth between them (and setting DispatchConsumersAsync
appropriately), you might notice that there's no change in behavior. This is because the dispatcher underneath is invoking the events "asynchronously", but with no degree of concurrency.
To enable concurrent message handling on your RMQ connection, set ConsumerDispatchConcurrency = {>1}
on the IConnectionFactory
object prior to establishing the connection. It defaults to one, which is effectively serial processing.
Using your scenario, for example, if you set ConsumerDispatchConcurrency = 5
then I would suspect you'd see what you originally expected. Something like:
Doing a fake API call...
Doing a fake API call...
Doing a fake API call...
Doing a fake API call...
Doing a fake API call...
Done with fake API call!
Done with fake API call!
Done with fake API call!
Done with fake API call!
Done with fake API call!
While ConsumerDispatchConcurrency = 2
might look something more like this:
Doing a fake API call...
Doing a fake API call...
Done with fake API call!
Done with fake API call!
Doing a fake API call...
Doing a fake API call...
Done with fake API call!
Done with fake API call!
Doing a fake API call...
Done with fake API call!
Upvotes: 11