Pragmatic
Pragmatic

Reputation: 3127

Concurrent message processing in RabbitMQ consumer

I am new to RabbitMQ so please excuse me if my question sound trivial. I want to publish message on RabbitMQ which will be processed by RabbitMQ consumer.

My consumer machine is a multi core machine (preferably worker role on azure). But QueueBasicConsumer pushes one message at a time. How can I program to utilize all core where I can process multiple message concurrently.

  1. One solution could be to open multiple channels in multiple threads and then process message over there. But in this case how will i decide the number of threads.
  2. Another approach could be to read message on main thread and then create task and pass message to this task. In this case I will have to stop consuming messages in case there are many message (after a threshold) already in progress. Not sure how could this be implemented.

Thanks In Advance

Upvotes: 3

Views: 7522

Answers (1)

Bojan Resnik
Bojan Resnik

Reputation: 7388

Your second option sounds much more reasonable - consume on a single channel and spawn multiple tasks to handle the messages. To implement concurrency control, you could use a semaphore to control the number of tasks in flight. Before starting a task, you would wait for the semaphore to become available, and after a task has finished, it would signal the semaphore to allow other tasks to run.

You haven't specified you language/technology stack of choice, but whatever you do - try to utilise a thread pool instead of creating and managing threads yourself. In .NET, that would mean using Task.Run to process messages asynchronously.

Example C# code:

using (var semaphore = new SemaphoreSlim(MaxMessages))
{
    while (true)
    {
        var args = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
        semaphore.Wait();
        Task.Run(() => ProcessMessage(args))
            .ContinueWith(() => semaphore.Release());
    }
}

Instead of controlling the concurrency level yourself, you might find it easier to enable explicit ACK control on the channel, and use RabbitMQ Consumer Prefetch to set the maximum number of unacknowledged messages. This way, you will never receive more messages than you wanted at once.

Upvotes: 2

Related Questions