Bob Horn
Bob Horn

Reputation: 34335

How can I have my RabbitMQ consumer receive just one message without it timing-out and sending agan?

We have an issue where, if a message takes too long to process, it moves from unacked back to ready and will get sent again. Is there a way to prevent that?

We have a queue consumer class that is initialized this way:

    private void Initialize()
    {
        string queueName = _configuration["QueueName"] + _configuration["QueueNameSuffixForSubscriber"];

        var factory = CreateConnectionFactory();

        var connection = factory.CreateConnection();
        _channel = connection.CreateModel();

        // The message TTL must match for QueueDeclare() to work.
        var arguments = new Dictionary<string, object>();
        arguments.Add("x-message-ttl", Convert.ToInt32(_configuration["EventBusMessageTtl"]));

        // By default, RabbitMQ dispatches all the messages to the first consumer. You can change this behaviour by
        // setting the BasicQos, this controls the no of messages a consumer can receive before it acknowledges it.
        _channel.BasicQos(0, 1, false);

        _channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);

        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += ConsumerReceived;

        _channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
    }

Notice this line:

_channel.BasicQos(0, 1, false);

That's how our consumer just pulls one message at a time. HOWEVER, if that message takes longer than 2 minutes before sending the ack, RMQ will send the message again. We don't want that. (It should almost never happen that it takes more than 2 minutes to process, but we don't want to settle for almost.)

Is there a way to stop RMQ from sending the message again? I could send the ack before processing the message, but then we'd receive the next message right away and we don't want that either. We want to wait for the message to be done processing before accepting the next message.

If we could just pull from RMQ when we're ready, that would solve it.

Here is the ConsumerReceived() method:

    private void ConsumerReceived(object model, BasicDeliverEventArgs eventArgs)
    {
        try
        {
            var message = Encoding.UTF8.GetString(eventArgs.Body.ToArray());
            InvokeHandlers(eventArgs, message);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error occurred in processing message or invoking handlers.");
        }
        finally
        {
            _channel.BasicAck(eventArgs.DeliveryTag, false);
        }
    }

Upvotes: 1

Views: 1952

Answers (1)

David L
David L

Reputation: 33873

I agree that this seems like an ideal flow for polling instead of a consumer subscription. Typically you would not want to poll since it greatly harms throughput, but in your case, that's exactly what you want.

while (true)
{
    BasicGetResult result = channel.BasicGet(queueName, noAck);
    if (result == null) 
    {
        // No message available at this time.
        // Sleep/delay to avoid cpu and I/O churn
        Thread.Sleep(2000);
    } 
    else 
    {
        try 
        {
            IBasicProperties props = result.BasicProperties;
            var message = Encoding.UTF8.GetString(result.Body.ToArray());
            InvokeHandlers(eventArgs, message);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error occurred in processing message or invoking handlers.");
        }
        finally
        {
            _channel.BasicAck(eventArgs.DeliveryTag, false);
        }
    }
}

Upvotes: 2

Related Questions