bommina
bommina

Reputation: 317

C# Consuming / Reading message every x interval with y batch size From Rabbit MQ

I was using Rabbit MQ for processing messages in sequential order. Now I have scenario, where don't need to consume message immediately, but I would like to consume/ read messages from MQ every 1(x) minute with batch size of 20(y)

So I can process this 20 messages in single time and save to DB in single call instead of calling 20 times for each message.

So how to receive/consume messages in batch on every x interval.

I have seen following information, Consume messages in batches - RabbitMQ and Rabbitmq retrieve multiple messages using single synchronous call using .NET

I tried to implement 2nd question implementation (questions/32309155) but not working, didn't understand "consumer.Received" will receive **_fetchSize ==> 20 ** means, it will receive 20 messages in single read? or how it will work, because I changed fetchsize to 10, but consumer.received is receiving single message.

using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.BasicQos(0, 1, false);
                channel.ExchangeDeclare("helloExchange", type:"direct");
                channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false,
                    arguments: null);

                channel.QueueBind("hello", "helloExchange", routingKey:"hello");

                var consumer = new EventingBasicConsumer(channel);

                consumer.Received += (model, ea) =>
                {
                    bool canAck = false;
                    var retryCount = 0;



                    try
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                       // DO PROCESS MESSAGE HERE
                        Console.WriteLine($"{typeof(MyConsumer).Name} Message consumed {message}");
                        canAck = true;
                    }
                    catch (Exception ex)
                    {canAck = false;
                      // LOG ERROR
                    }

                    try
                    {
                        if (canAck)
                        {
                            channel.BasicAck(ea.DeliveryTag, false);
                        }
                        else
                        {
                            channel.BasicNack(ea.DeliveryTag, false, false);
                        }
                    }
                    catch (AlreadyClosedException ex)
                    {
                        Console.WriteLine(ex.Message + " >> RabbitMQ is closed!");
                    }
                };
                channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }

Upvotes: 1

Views: 1734

Answers (1)

qbsp
qbsp

Reputation: 312

The consumer will receive always only one message at a time. The fetch size will determine how many unacknowledged message can be delivered on the channel to the consumer.

So, consider a fetch size of 3 for example, 3 messages will be delivered but if you do not acknowledge any, means that even if you get a new message coming in the queue (4th) it will not be sent to the channel (thus not consumed) until you acknowledge at least one of the 3 initial messages.

In your case to be able to handle your scenario and consume messages in batch you could do something like:

  • set a fetchSize of 20
  • any time you get a new message you save append to a List
  • once you reach the limit (20) you start processing it and acknowledge all the messages in the batch (thus clean the list)
  • when the batch is completed and all the messages are acknowledged, you'll start to receive new messages (then start the flow again)

Even tho this is technically possible I'll not go for this implementation as it will make more complicated to handle retries/error handling etc. Using a message queue you have the option to make your code atomic, idempotent and resilient which will lead to an easier error/retry handling.

Upvotes: 2

Related Questions