Ravi Khambhati
Ravi Khambhati

Reputation: 743

Azure Service Bus ReceiveAndDelete

So I have created sample application where an application will send message to queue and read the message. With this application I am using "ReceiveAndDelete" and below is the sample code

Create Message

private static async void CreateMessage(string queueName, string textMessage)
{
    // create a Service Bus client 
    await using (ServiceBusClient client = new ServiceBusClient(connectionString))
    {
        // create a sender for the queue 
        ServiceBusSender sender = client.CreateSender(queueName);

        // create a message that we can send
        ServiceBusMessage message = new ServiceBusMessage(textMessage);

        // send the message
        await sender.SendMessageAsync(message);
        Console.WriteLine($"Sent a single message to the queue: {queueName}");
    }
}

Receive Message

// handle received messages
static async Task MessageHandler(ProcessMessageEventArgs args)
{
    string body = args.Message.Body.ToString();
    Console.WriteLine($"Received: {body}");

    // complete the message. messages is deleted from the queue. 
    await args.CompleteMessageAsync(args.Message);
}

// handle any errors when receiving messages
static Task ErrorHandler(ProcessErrorEventArgs args)
{
    Console.WriteLine(args.Exception.ToString());
    return Task.CompletedTask;
}

static async Task ReceiveMessagesAsync()
{
    var processorOptions = new ServiceBusProcessorOptions
    {
        AutoCompleteMessages = false,
        MaxConcurrentCalls = 1,
        MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(10),
        ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
        PrefetchCount = 1
    };


    await using (ServiceBusClient client = new ServiceBusClient(connectionString))
    {

        // create a processor that we can use to process the messages
        ServiceBusProcessor processor = client.CreateProcessor(queueName, processorOptions);

        // add handler to process messages
        processor.ProcessMessageAsync += MessageHandler;

        // add handler to process any errors
        processor.ProcessErrorAsync += ErrorHandler;
        
        // start processing 
        await processor.StartProcessingAsync();

        Console.WriteLine("Wait for a minute and then press any key to end the processing");
        Console.ReadKey();

        // stop processing 
        Console.WriteLine("\nStopping the receiver...");
        await processor.StopProcessingAsync();
        Console.WriteLine("Stopped receiving messages");
    }
}

Main Method

static string connectionString = "***";
static string queueName = "firstqueue";
static async Task Main(string[] args)
{
    try
    {
        await CreateMessage(queueName, "Message 1 to test 'ReceiveAndDelete'");
        await CreateMessage(queueName, "Message 2 to test 'ReceiveAndDelete'");

        await ReceiveMessagesAsync();
    }
    catch (Exception ex)
    {

        throw;
    }
    Console.ReadKey();
}

Everything works great but as soon as the application calls "await processor.StartProcessingAsync();" all messages are read from queue even if all messages are not yet processed. In my example I have two message in queue but when "await processor.StartProcessingAsync();" is called message counts becomes zero(basically message is dequeued) and it starts processing of message one by one. To my understanding if the message has not started processing so it should be in the queue. With this example only one message should be removed from Queue and second message should be visible on the queue.

Is the expected behavior or am I missing something here?

Upvotes: 3

Views: 4243

Answers (1)

Gaurav Mantri
Gaurav Mantri

Reputation: 136196

Is the expected behavior or am I missing something here?

This is the expected behavior of ReceiveAndDelete mode. Service Bus deletes the message as soon as it is sent to the client regardless of whether the client is able to process the message or not.

From this link:

This operation receives a message from a queue or subscription, and removes the message from that queue or subscription in one atomic operation.

If you want to control this behavior, you may want to fetch the messages in PeekLock mode, process the message and then call Complete method on message to delete that message if the processing was successful.

UPDATE

I tried your code and here are my observations:

  1. With PrefetchCount = 1, first time 2 messages are fetched from the queue and are deleted. After that, a single message is fetched and deleted. Possible explanation is that 1 message is pre-fetched and 1 message is fetched on request.

  2. With PrefetchCount = 0 (or omitted from `processorOptions), a single message is fetched and deleted.

Please try the code below:

using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;

namespace SO67076189
{
    class Program
    {
        static string connectionString = "connection-string";
        static string queueName = "queue-name";
        static async Task Main(string[] args)
        {
            try
            {
                await CreateMessage(queueName, "Message 1 to test 'ReceiveAndDelete'");
                await CreateMessage(queueName, "Message 2 to test 'ReceiveAndDelete'");
                await CreateMessage(queueName, "Message 3 to test 'ReceiveAndDelete'");
                await CreateMessage(queueName, "Message 4 to test 'ReceiveAndDelete'");
                await CreateMessage(queueName, "Message 5 to test 'ReceiveAndDelete'");

                await ReceiveMessagesAsync();
            }
            catch (Exception ex)
            {

                throw;
            }
            Console.ReadKey();
        }

        private static async Task CreateMessage(string queueName, string textMessage)
        {
            // create a Service Bus client 
            await using (ServiceBusClient client = new ServiceBusClient(connectionString))
            {
                // create a sender for the queue 
                ServiceBusSender sender = client.CreateSender(queueName);

                // create a message that we can send
                ServiceBusMessage message = new ServiceBusMessage(textMessage);

                // send the message
                await sender.SendMessageAsync(message);
                Console.WriteLine($"Sent a single message to the queue: {queueName}");
            }
        }

        static async Task MessageHandler(ProcessMessageEventArgs args)
        {
            string body = args.Message.Body.ToString();
            Console.WriteLine($"Received: {body}");

            // complete the message. messages is deleted from the queue. 
            //await args.CompleteMessageAsync(args.Message);
        }

        // handle any errors when receiving messages
        static Task ErrorHandler(ProcessErrorEventArgs args)
        {
            Console.WriteLine(args.Exception.ToString());
            return Task.CompletedTask;
        }

        static async Task ReceiveMessagesAsync()
        {
            var processorOptions = new ServiceBusProcessorOptions
            {
                //AutoCompleteMessages = false,
                //MaxConcurrentCalls = 1,
                //MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(10),
                ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
                //PrefetchCount = 1
            };


            await using (ServiceBusClient client = new ServiceBusClient(connectionString))
            {

                // create a processor that we can use to process the messages
                ServiceBusProcessor processor = client.CreateProcessor(queueName, processorOptions);

                // add handler to process messages
                processor.ProcessMessageAsync += MessageHandler;

                // add handler to process any errors
                processor.ProcessErrorAsync += ErrorHandler;

                // start processing 
                await processor.StartProcessingAsync();

                Console.WriteLine("Wait for a minute and then press any key to end the processing");
                Console.ReadKey();

                // stop processing 
                Console.WriteLine("\nStopping the receiver...");
                await processor.StopProcessingAsync();
                Console.WriteLine("Stopped receiving messages");
            }
        }
    }
}

Upvotes: 2

Related Questions