Gopi
Gopi

Reputation: 5877

Consume messages in batches - RabbitMQ

I was able to consume multiple messages that are sent by multiple producers to the same exchange with different routing key using the above code and was able to insert each message to database.

But this will consume too much of resources as messages will be inserted into DB one after the other. So I decided to go for batch insert and I found I can set BasicQos

After setting the message limit to 10 in BasicQos, my expectation is the Console.WriteLine must write 10 messages, but it is not as expected.

My expectation is to consume N number messages from the queue and do bulk insert and on successful send ACK else No ACK

Here is the piece of code I use.

using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        channel.QueueBind(queue: "queueName", exchange: "exchangeName", routingKey: "Producer_A");
        channel.QueueBind(queue: "queueName", exchange: "exchangeName", routingKey: "Producer_B");

        channel.BasicQos(0, 10, false);

        var consumer = new EventingBasicConsumer(channel);
        channel.BasicConsume(queue: "queueName", noAck: false, consumer: consumer);

        consumer.Received += (model, ea) =>
        {
            try
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);

                // Insert into Database

                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                Console.WriteLine(" Recevier Ack  " + ea.DeliveryTag);
            }
            catch (Exception e)
            {
                channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
                Console.WriteLine(" Recevier No Ack  " + ea.DeliveryTag);
            }
        };

        Console.ReadLine();
    }
}

Upvotes: 9

Views: 20343

Answers (2)

cinoy
cinoy

Reputation: 11

Batch size based consumption can be done using the channel.basicQos().

Channel channel = connection.createChannel();
channel.basicQos(10);

It specifies the maximum no of messages to be fetched without sending ACK for each.

Use the DefaultConsumer class and override its methods.

Consumer batchConsumer = new DefaultConsumer(channel) {

  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 
  }

  @Override
  public void handleCancelOk(String consumerTag) {
 
  }
};

Consume 10 messages using channel.basicConsume()

channel.basicConsume(QUEUE_NAME, false, batchConsumer);

When channel.basicConsume() is called it will fetch a batch of 10 messages. 'false' is set to disable auto ack, and ACK to be sent only once after consuming entire batch.

channel.basicAck(getLastMessageEnvelope().getDeliveryTag(), true);

Here 'true' means we are sending ACK for multiple messages.

Detailed explanation can be found in

RabbitMQ Batch Consumption

Upvotes: 0

Gabriele Santomaggio
Gabriele Santomaggio

Reputation: 22682

BasicQos = 10 means that the client fetch only 10 messages at time, but when you consume it you will see always one message a time. Read here: https://www.rabbitmq.com/consumer-prefetch.html

AMQP specifies the basic.qos method to allow you to limit the number of unacknowledged messages on a channel (or connection) when consuming (aka "prefetch count").

for your scope you have to download the messages, put it inside a temporary list and then insert into the DB.

an then you can use:

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: true);

void basicAck()

Parameters: deliveryTag - the tag from the received AMQP.Basic.GetOk or AMQP.Basic.Deliver

multiple - true to acknowledge all messages up to and including the supplied delivery tag; false to acknowledge just the supplied delivery tag.

Example

final List<String> myMessagges = new ArrayList<String>();
        channel.basicConsume("my_queue", false, new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                myMessagges.add(new String(body));
                System.out.println("Received...");

                if (myMessagges.size() >= 10) {
                    System.out.println("insert into DB...");
                    channel.basicAck(envelope.getDeliveryTag(), true);
                    myMessagges.clear();
                }


            }
        });

Upvotes: 10

Related Questions