Reputation: 805
I am new to azure service bus, I am supposed to push messages to a queue and then have a separate scheduled task that will read all active messages in this queue and bulk import them to sql I tried this code before and it was working when I called it right after sending the message but now its not working within the separate scheduled task. Any help why or what i can use to batch read the messages or that's not possible
queueClient = new QueueClient(conn, queuename, ReceiveMode.ReceiveAndDelete);
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentCalls = 1,
AutoComplete = false
};
queueClient.RegisterMessageHandler(ReceiveMessagesAsync, messageHandlerOptions);
public async Task ReceiveMessagesAsync(Message message, CancellationToken token)
{
messages.Add(message.Body.ToString());
Console.WriteLine($"Received message: {Encoding.UTF8.GetString(message.Body)}");
await queueClient.CompleteAsync(message.SystemProperties.LockToken);
}
public Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
Console.WriteLine(exceptionReceivedEventArgs.Exception);
return Task.CompletedTask;
}
Upvotes: 0
Views: 3077
Reputation: 264
It would be good to know what library you are using to do this, but I recommend using the https://www.nuget.org/packages/Azure.Messaging.ServiceBus/ nuget package, with it you can:
class Program
{
static async Task Main(string[] args)
{
string queueName = "myqueue";
var client = new ServiceBusClient("myconn");
// create a processor that we can use to process the messages
var processor = client.CreateProcessor(queueName, new ServiceBusProcessorOptions());
// add handler to process messages
processor.ProcessMessageAsync += MessageHandler;
// add handler to process any errors
processor.ProcessErrorAsync += ErrorHandler;
await processor.StartProcessingAsync();
// Process messages for 5 minutes
await Task.Delay(TimeSpan.FromMinutes(5));
// stop processing
Console.WriteLine("Stopping the receiver...");
await processor.StopProcessingAsync();
Console.WriteLine("Stopped receiving messages");
}
private static Task ErrorHandler(ProcessErrorEventArgs arg)
{
// Here you can catch errors;
return Task.CompletedTask;
}
static async Task MessageHandler(ProcessMessageEventArgs args)
{
// Do something with the message .e.g deserialize it and insert to SQL
try
{
BinaryData content = args.Message.Body;
// Here you can use :
string contentStr = content.ToString(); // This would be your data
}
catch (Exception e)
{
// If something goes wrong you should abandon the message
await args.AbandonMessageAsync(args.Message);
}
await args.CompleteMessageAsync(args.Message);
}
}
This will process all messages until the time is up. This would be good for a scheduled task. If you want to stop after a specific number of messages have been processed, you can do an async loop and check if the number has been reached.
Upvotes: 1