ZedZip
ZedZip

Reputation: 6462

RabbitMQ: How multiple consumers can receive messages from the same queue?

I run the producer, it generates N messages, i see them on the dashboard. When I run a receiver it receive all messages from the queue and the queue is an empty.

    static void Receive(string QueName)
    {
        ConnectionFactory connectionFactory = new ConnectionFactory
        {
            HostName = HostName,
            UserName = UserName,
            Password = Password,
        };
        var connection = connectionFactory.CreateConnection();
        var channel = connection.CreateModel();
        channel.BasicQos(0, 1, false);
        MessageReceiver messageReceiver = new MessageReceiver(channel);
        channel.BasicConsume(QueName, false, messageReceiver);
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }

// Receiver
 public class MessageReceiver : DefaultBasicConsumer
    {
        private readonly IModel _channel;
        public MessageReceiver(IModel channel)
        {
            _channel = channel;
        }
        public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
        {
            Console.WriteLine($"------------------------------");
            Console.WriteLine($"Consuming Message");
            Console.WriteLine(string.Concat("Message received from the exchange ", exchange));
            Console.WriteLine(string.Concat("Consumer tag: ", consumerTag));
            Console.WriteLine(string.Concat("Delivery tag: ", deliveryTag));
            Console.WriteLine(string.Concat("Routing tag: ", routingKey));
            //Console.WriteLine(string.Concat("Message: ", Encoding.UTF8.GetString(body)));

            var message = Encoding.UTF8.GetString(body.ToArray());
            Console.WriteLine(string.Concat("Message: ", message));
            Console.WriteLine($"------------------------------");
            _channel.BasicAck(deliveryTag, false);
        }
    }

I need to have multiple producers which generate messages to the same queue. And multiple customers receive messages from the queue. And messages will be deleted by queue TTL. But now the 1st receiver gets all messages from the queue. How can I do this?

Upvotes: 0

Views: 4342

Answers (2)

Shahram
Shahram

Reputation: 55

We use “exchange” here, just to show the exchange mechanics in same sample, it’s not really needed for the task (check Worker2 project, it works with another queue, which is binded to the same exchange):

channel.ExchangeDeclare(exchange: “logs”, type: ExchangeType.Fanout);

Full sample of consumption

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Timers;

class Worker
{
    public static void Main()    
    {
        // Test of timer handler
        System.Timers.TimeraTimer = new System.Timers.Timer();
        aTimer.Elapsed += new ElapsedEventHandler((source, e) 
                                => Console.Write("Timer Test"));
        aTimer.Interval=3000;
        // Test timer
        // aTimer.Enabled = true;
        
        var factory = new ConnectionFactory()        
        {
            HostName = "localhost", UserName="user", Password="password",
            // DispatchConsumersAsync = true        
        };
        var connection = factory.CreateConnection();
        
        // Add multiple consumers, so that queue can be processed "in
        // parallel"
        for (int i=1; i<10; i++)        
        {
            var j=i;
            var channel = connection.CreateModel();
            
            channel.ExchangeDeclare(exchange: "logs", type: 
                                    ExchangeType.Fanout);
            var queueName=channel.QueueDeclare("test1", durable: true, 
                            autoDelete: false, exclusive: false); 
            
            // take 1 message per consumer
            channel.BasicQos(0, 1, false);
            
            channel.QueueBind(queue: queueName,
                    exchange: "logs",
                    routingKey: "");
            Console.WriteLine($" [*] Waiting for messages in {j}");
            
            var consumer = new EventingBasicConsumer(channel);
            
            consumer. Received+= (model, ea) =>            
            {
                byte[] body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine($" [x] Received in {j} -> {message} at 
                                                    {DateTime.Now}");
                
                // Thread.Sleep(dots * 1000);
                
                // await Task.Delay(3000);
                Thread.Sleep(10000); 
                // async works too
                
                if (j==5)                
                {
                    // Test special case of returning item to queue: in 
                    // this case we received the message, but did not process 
                    // it because of some reason.
                    // QOS is 1, so our consumer is already full. We need 
                    // to return the message to the queue, so that another 
                    // consumer can work with it
                    Console.WriteLine($"[-] CANT PROCESS {j} consumer! 
                                        Error with -> {message}"); 
                    channel.BasicNack(deliveryTag: ea.DeliveryTag, 
                                            multiple: false, true);                
                }
                else                
                {
                    Console.WriteLine($" [x] Done {j} -> {message} at 
                                        {DateTime.Now}");
                    
                    // here channel could also be accessed as 
                    ((EventingBasicConsumer)sender).Model
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, 
                                                multiple: false);                
                }            
            };
            channel.BasicConsume(queue: queueName, autoAck: false, 
                                consumer: consumer);        
        }
        
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();    
    }
}

full example in link

Upvotes: 0

ZedZip
ZedZip

Reputation: 6462

The best solution is : every client should have its own queue, may be with TTL, may be with expiration parameter.

Upvotes: 1

Related Questions