ashtondunderdale
ashtondunderdale

Reputation: 1

How to consume a singular RabbitMQ message

I am setting up a program for cyclically calling a method which reads a single message from a rabbit message queue. It should consume and process one message then return from the method.

My current setup consumes two messages on the queue and I have no idea why.

public async Task<Message> Receive(string custId, string db, ILogger logger)
{
    var factory = CreateConnectionFactory();
    var connection = await factory.CreateConnectionAsync();
    var channel = await connection.CreateChannelAsync();

    await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);

    var queue = await QueueDeclareAsync(channel, db);
    if (queue.MessageCount is 0) return new Message();

    var consumer = new AsyncEventingBasicConsumer(channel);
    var messageReceived = new TaskCompletionSource<string?>(TaskCreationOptions.RunContinuationsAsynchronously);

    var consumerTag = await channel.BasicConsumeAsync(db, autoAck: false, consumer: consumer);
    consumer.ReceivedAsync += async (model, ea) =>
    {
        var body = ea.Body.ToArray();
        var receivedMessage = Encoding.UTF8.GetString(body);

        await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
        messageReceived.TrySetResult(receivedMessage);

        await channel.BasicCancelAsync(consumerTag);
    };

    var message = await messageReceived.Task;

    return await ProcessReceivedMessage(custId, message);
}

I tried using:

await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);

to limit it at one message but it does not seem to work.

Upvotes: 0

Views: 40

Answers (0)

Related Questions