Reputation: 471
I am building a service bus based application. When my receiver get the message from the service bus, it may take a lot of time to process it, so I would like to await for the processing, but if it takes too much time, the processing should be canceled and the message should be completed.
Here it is the code of my receiver:
client.OnMessageAsync(async message =>
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("message from api");
try
{
CancellationTokenSource source = new CancellationTokenSource();
source.CancelAfter(TimeSpan.FromSeconds(30));
var task = new Task(() => ProcessServiceBusMessage(message).Wait(), source.Token);
await message.CompleteAsync();
}
catch (Exception ex)
{
errLog.Error(ex.Message + ex.StackTrace);
await message.DeadLetterAsync();
}
}, new OnMessageOptions() { AutoComplete = false, MaxConcurrentCalls = 20, AutoRenewTimeout = new TimeSpan(0, 5, 0) });
As you can see I launch my ProcessServiceBusMessage
function inside a Task, I wait for it and I use a CancellationToken
to cancel the task after 30 seconds (if the message require more than 30 seconds to complete processing), but it doesn't work.
How can I fix it?
EDIT: Specifically the task is not canceled after 30 seconds and it's immediately executed message.CompleteAsync()
, without waiting for the task
Thank you.
Upvotes: 0
Views: 1326
Reputation: 297
Do you want to consider using Azure Storage Queue instead?
By default a message is locked for 30sec, so there may be a conflict. Service Bus may be throwing a Lock expired exception before your 30sec cancellation occurs
Upvotes: 0
Reputation: 35144
The problem is that you create a task but don't await it, so CompleteAsync
is called immediately.
This should be a cleaner approach:
var task = ProcessServiceBusMessage(message);
await Task.WhenAny(task, Task.Delay(timeout));
await message.CompleteAsync();
Upvotes: 2