Reputation: 317
I was using Rabbit MQ for processing messages in sequential order. Now I have scenario, where don't need to consume message immediately, but I would like to consume/ read messages from MQ every 1(x) minute with batch size of 20(y)
So I can process this 20 messages in single time and save to DB in single call instead of calling 20 times for each message.
So how to receive/consume messages in batch on every x interval.
I have seen following information, Consume messages in batches - RabbitMQ and Rabbitmq retrieve multiple messages using single synchronous call using .NET
I tried to implement 2nd question implementation (questions/32309155) but not working, didn't understand "consumer.Received" will receive **_fetchSize ==> 20 ** means, it will receive 20 messages in single read? or how it will work, because I changed fetchsize to 10, but consumer.received is receiving single message.
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.BasicQos(0, 1, false);
channel.ExchangeDeclare("helloExchange", type:"direct");
channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false,
arguments: null);
channel.QueueBind("hello", "helloExchange", routingKey:"hello");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
bool canAck = false;
var retryCount = 0;
try
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
// DO PROCESS MESSAGE HERE
Console.WriteLine($"{typeof(MyConsumer).Name} Message consumed {message}");
canAck = true;
}
catch (Exception ex)
{canAck = false;
// LOG ERROR
}
try
{
if (canAck)
{
channel.BasicAck(ea.DeliveryTag, false);
}
else
{
channel.BasicNack(ea.DeliveryTag, false, false);
}
}
catch (AlreadyClosedException ex)
{
Console.WriteLine(ex.Message + " >> RabbitMQ is closed!");
}
};
channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
Upvotes: 1
Views: 1734
Reputation: 312
The consumer will receive always only one message at a time. The fetch size will determine how many unacknowledged message can be delivered on the channel to the consumer.
So, consider a fetch size of 3 for example, 3 messages will be delivered but if you do not acknowledge any, means that even if you get a new message coming in the queue (4th) it will not be sent to the channel (thus not consumed) until you acknowledge at least one of the 3 initial messages.
In your case to be able to handle your scenario and consume messages in batch you could do something like:
Even tho this is technically possible I'll not go for this implementation as it will make more complicated to handle retries/error handling etc. Using a message queue you have the option to make your code atomic, idempotent and resilient which will lead to an easier error/retry handling.
Upvotes: 2