Bob Horn
Bob Horn

Reputation: 34297

Why is only one RabbitMQ consumer receiving all the messages when I have two consumer threads going?

I'm running a test and trying to get two RabbitMQ consumers sharing the pulling of messages from the queue. However, every time I run this, only one of the subscribers/consumers process all of the messages. Why?

I have a separate process that puts 10 sample messages on the queue. I stop that, and then run this.

class Program
{
    static void Main(string[] args)
    {
        Task task1 = Task.Factory.StartNew(() => RabbitReceiver.PullFromQueue(2000));
        Task task2 = Task.Factory.StartNew(() => RabbitReceiver.PullFromQueue(1000));
        Task.WaitAll(task1, task2);

        Console.WriteLine("Press any key to close.");
        Console.ReadKey();
    }
}

And here is the consumer that each thread uses:

public static class RabbitReceiver
{
    public static void PullFromQueue(int sleepTimeInMilliseconds)
    {
        string hostName = "localhost";
        string queueName = "Sandbox.v1";

        var factory = new ConnectionFactory { HostName = hostName, UserName = "guest", Password = "guest", Port = 6003 };

        var connection = factory.CreateConnection();
        var channel = connection.CreateModel();

        var arguments = new Dictionary<string, object>();
        arguments.Add("x-message-ttl", 864000000); // Not sure why I have to specify this. Got an exception if I didn't.

        channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);

        var consumer = new EventingBasicConsumer(channel);

        consumer.Received += (model, eventArgs) =>
        {
            var body = eventArgs.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            Thread.Sleep(sleepTimeInMilliseconds);

            // This should probably go in a finally block.
            channel.BasicAck(eventArgs.DeliveryTag, false);
        };

        channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
    }
}

Upvotes: 0

Views: 2208

Answers (1)

bumblebee
bumblebee

Reputation: 1841

This happens if BasicQos is not set. By default, RabbitMQ dispatches all the messages to the first consumer when the message enters the queue. It doesn't look at the number of unacknowledged messages for a consumer.

You can change this behaviour by setting the BasicQos, this controls the no of messages a consumer can receive before it acknowledges it.

channel.BasicQos(0, 1, false);

Reference: https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html

Upvotes: 2

Related Questions