Reputation: 1
I am setting up a program for cyclically calling a method which reads a single message from a rabbit message queue. It should consume and process one message then return from the method.
My current setup consumes two messages on the queue and I have no idea why.
public async Task<Message> Receive(string custId, string db, ILogger logger)
{
var factory = CreateConnectionFactory();
var connection = await factory.CreateConnectionAsync();
var channel = await connection.CreateChannelAsync();
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);
var queue = await QueueDeclareAsync(channel, db);
if (queue.MessageCount is 0) return new Message();
var consumer = new AsyncEventingBasicConsumer(channel);
var messageReceived = new TaskCompletionSource<string?>(TaskCreationOptions.RunContinuationsAsynchronously);
var consumerTag = await channel.BasicConsumeAsync(db, autoAck: false, consumer: consumer);
consumer.ReceivedAsync += async (model, ea) =>
{
var body = ea.Body.ToArray();
var receivedMessage = Encoding.UTF8.GetString(body);
await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
messageReceived.TrySetResult(receivedMessage);
await channel.BasicCancelAsync(consumerTag);
};
var message = await messageReceived.Task;
return await ProcessReceivedMessage(custId, message);
}
I tried using:
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);
to limit it at one message but it does not seem to work.
Upvotes: 0
Views: 40