Reputation: 743
So I have created sample application where an application will send message to queue and read the message. With this application I am using "ReceiveAndDelete" and below is the sample code
Create Message
private static async void CreateMessage(string queueName, string textMessage)
{
// create a Service Bus client
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
// create a sender for the queue
ServiceBusSender sender = client.CreateSender(queueName);
// create a message that we can send
ServiceBusMessage message = new ServiceBusMessage(textMessage);
// send the message
await sender.SendMessageAsync(message);
Console.WriteLine($"Sent a single message to the queue: {queueName}");
}
}
Receive Message
// handle received messages
static async Task MessageHandler(ProcessMessageEventArgs args)
{
string body = args.Message.Body.ToString();
Console.WriteLine($"Received: {body}");
// complete the message. messages is deleted from the queue.
await args.CompleteMessageAsync(args.Message);
}
// handle any errors when receiving messages
static Task ErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine(args.Exception.ToString());
return Task.CompletedTask;
}
static async Task ReceiveMessagesAsync()
{
var processorOptions = new ServiceBusProcessorOptions
{
AutoCompleteMessages = false,
MaxConcurrentCalls = 1,
MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(10),
ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
PrefetchCount = 1
};
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
// create a processor that we can use to process the messages
ServiceBusProcessor processor = client.CreateProcessor(queueName, processorOptions);
// add handler to process messages
processor.ProcessMessageAsync += MessageHandler;
// add handler to process any errors
processor.ProcessErrorAsync += ErrorHandler;
// start processing
await processor.StartProcessingAsync();
Console.WriteLine("Wait for a minute and then press any key to end the processing");
Console.ReadKey();
// stop processing
Console.WriteLine("\nStopping the receiver...");
await processor.StopProcessingAsync();
Console.WriteLine("Stopped receiving messages");
}
}
Main Method
static string connectionString = "***";
static string queueName = "firstqueue";
static async Task Main(string[] args)
{
try
{
await CreateMessage(queueName, "Message 1 to test 'ReceiveAndDelete'");
await CreateMessage(queueName, "Message 2 to test 'ReceiveAndDelete'");
await ReceiveMessagesAsync();
}
catch (Exception ex)
{
throw;
}
Console.ReadKey();
}
Everything works great but as soon as the application calls "await processor.StartProcessingAsync();" all messages are read from queue even if all messages are not yet processed. In my example I have two message in queue but when "await processor.StartProcessingAsync();" is called message counts becomes zero(basically message is dequeued) and it starts processing of message one by one. To my understanding if the message has not started processing so it should be in the queue. With this example only one message should be removed from Queue and second message should be visible on the queue.
Is the expected behavior or am I missing something here?
Upvotes: 3
Views: 4243
Reputation: 136196
Is the expected behavior or am I missing something here?
This is the expected behavior of ReceiveAndDelete
mode. Service Bus deletes the message as soon as it is sent to the client regardless of whether the client is able to process the message or not.
From this link
:
This operation receives a message from a queue or subscription, and removes the message from that queue or subscription in one atomic operation.
If you want to control this behavior, you may want to fetch the messages in PeekLock
mode, process the message and then call Complete
method on message to delete that message if the processing was successful.
UPDATE
I tried your code and here are my observations:
With PrefetchCount = 1
, first time 2 messages are fetched from the queue and are deleted. After that, a single message is fetched and deleted. Possible explanation is that 1 message is pre-fetched and 1 message is fetched on request.
With PrefetchCount = 0
(or omitted from `processorOptions), a single message is fetched and deleted.
Please try the code below:
using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
namespace SO67076189
{
class Program
{
static string connectionString = "connection-string";
static string queueName = "queue-name";
static async Task Main(string[] args)
{
try
{
await CreateMessage(queueName, "Message 1 to test 'ReceiveAndDelete'");
await CreateMessage(queueName, "Message 2 to test 'ReceiveAndDelete'");
await CreateMessage(queueName, "Message 3 to test 'ReceiveAndDelete'");
await CreateMessage(queueName, "Message 4 to test 'ReceiveAndDelete'");
await CreateMessage(queueName, "Message 5 to test 'ReceiveAndDelete'");
await ReceiveMessagesAsync();
}
catch (Exception ex)
{
throw;
}
Console.ReadKey();
}
private static async Task CreateMessage(string queueName, string textMessage)
{
// create a Service Bus client
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
// create a sender for the queue
ServiceBusSender sender = client.CreateSender(queueName);
// create a message that we can send
ServiceBusMessage message = new ServiceBusMessage(textMessage);
// send the message
await sender.SendMessageAsync(message);
Console.WriteLine($"Sent a single message to the queue: {queueName}");
}
}
static async Task MessageHandler(ProcessMessageEventArgs args)
{
string body = args.Message.Body.ToString();
Console.WriteLine($"Received: {body}");
// complete the message. messages is deleted from the queue.
//await args.CompleteMessageAsync(args.Message);
}
// handle any errors when receiving messages
static Task ErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine(args.Exception.ToString());
return Task.CompletedTask;
}
static async Task ReceiveMessagesAsync()
{
var processorOptions = new ServiceBusProcessorOptions
{
//AutoCompleteMessages = false,
//MaxConcurrentCalls = 1,
//MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(10),
ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
//PrefetchCount = 1
};
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
// create a processor that we can use to process the messages
ServiceBusProcessor processor = client.CreateProcessor(queueName, processorOptions);
// add handler to process messages
processor.ProcessMessageAsync += MessageHandler;
// add handler to process any errors
processor.ProcessErrorAsync += ErrorHandler;
// start processing
await processor.StartProcessingAsync();
Console.WriteLine("Wait for a minute and then press any key to end the processing");
Console.ReadKey();
// stop processing
Console.WriteLine("\nStopping the receiver...");
await processor.StopProcessingAsync();
Console.WriteLine("Stopped receiving messages");
}
}
}
}
Upvotes: 2