Brian Pilgren
Brian Pilgren

Reputation: 151

RabbitMQ: AsyncEventingBasicConsumer vs. EventingBasicConsumer

I have been doing a small sample application to consume messages on a queue in RabbitMQ.

The code should read the message an call a REST API (here replaced with a Task.Delay):

static void Main(string[] args)
{
    var factory = new ConnectionFactory
    {
        Uri = new Uri("..."),
        DispatchConsumersAsync = true
    };
    var connection = factory.CreateConnection();
    var channel = connection.CreateModel();

    var consumer = new AsyncEventingBasicConsumer(channel);
    consumer.Received += async (model, eventArgs) =>
    {
        Console.WriteLine("Doing a fake API call...");
        await Task.Delay(2000);
        Console.WriteLine("Done with fake API call!");
        channel.BasicAck(eventArgs.DeliveryTag, false);
    };
    channel.BasicConsume("myQueue", false, consumer);
}

When I run this application with five messages on the queue I get the following result: Message order

The messages are processed one by one and with the 2 second delay it takes ~10 seconds. I would have expected to see five lines with Doing a fake API call... followed by five lines of Done with fake API call! with a total time of ~2 seconds.

When doing the synchronous version I see the exact same result - which was expected:

static void Main(string[] args)
{
    var factory = new ConnectionFactory
    {
        Uri = new Uri("...")
    };
    var connection = factory.CreateConnection();
    var channel = connection.CreateModel();

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, eventArgs) =>
    {
        Console.WriteLine("Doing a fake API call...");
        Thread.Sleep(2000);
        Console.WriteLine("Done with fake API call!");
        channel.BasicAck(eventArgs.DeliveryTag, false);
    };
    channel.BasicConsume("myQueue", false, consumer);
}

My question is: What is the difference in using the AsyncEventingBasicConsumer compared to the EventingBasicConsumer? And: Is there a way of making the consumer process other messages while awaiting work for previous messages?

Upvotes: 11

Views: 4311

Answers (1)

user18032108
user18032108

Reputation: 111

This is an old question and I'm sure you're not still waiting for an answer but I've found that it can be challenging to really nail down the details on how RabbitMQ behaves. As such, hoping at least some information here will help someone in the future.

RMQ EventingBasicConsumer and AsyncEventingBasicConsumer differ mostly in their signatures, so we can attach async handlers to events like Received and ConsumerCancelled. If you switch back and forth between them (and setting DispatchConsumersAsync appropriately), you might notice that there's no change in behavior. This is because the dispatcher underneath is invoking the events "asynchronously", but with no degree of concurrency.

To enable concurrent message handling on your RMQ connection, set ConsumerDispatchConcurrency = {>1} on the IConnectionFactory object prior to establishing the connection. It defaults to one, which is effectively serial processing.

Using your scenario, for example, if you set ConsumerDispatchConcurrency = 5 then I would suspect you'd see what you originally expected. Something like:

Doing a fake API call...
Doing a fake API call...
Doing a fake API call...
Doing a fake API call...
Doing a fake API call...
Done with fake API call!
Done with fake API call!
Done with fake API call!
Done with fake API call!
Done with fake API call!

While ConsumerDispatchConcurrency = 2 might look something more like this:

Doing a fake API call...
Doing a fake API call...
Done with fake API call!
Done with fake API call!
Doing a fake API call...
Doing a fake API call...
Done with fake API call!
Done with fake API call!
Doing a fake API call...
Done with fake API call!

Upvotes: 11

Related Questions