Reputation: 93
i am trying to consume a message from queue using service bus queue trigger and do some job which will take some time to complete .i don't want other processor to pick the message while i am processing the message. I have my following configuration in host.json. When i receive the message from queue at await receiver.CompleteAsync(lockToken); i am getting an exception "The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue."
"serviceBus": {
"prefetchCount": 1,
"autoRenewTimeout": "00:05:00",
"messageHandlerOptions": {
"autoComplete": false,
"maxConcurrentCalls": 1,
"maxAutoRenewDuration": "00:04:00"
}
}
Code from Azure Function are as below
public static void Run([ServiceBusTrigger("testqueue", Connection = "AzureServiceBus.ConnectionString")]Message message, MessageReceiver messageReceiver,ILogger log)
{
log.LogInformation($"C# ServiceBus queue trigger function processed message: {messageReceiver.ClientId}");
log.LogInformation($"Message={Encoding.UTF8.GetString(message.Body)}");
string lockToken = message.SystemProperties.LockToken;
log.LogInformation($"Processing Message:={Encoding.UTF8.GetString(message.Body)}");
DoSomeJob(messageReceiver, lockToken,log);
}
public static async void DoSomeJob(MessageReceiver receiver,string lockToken, ILogger log)
{
try
{
await Task.Delay(360000);
await receiver.CompleteAsync(lockToken);
}
catch (Exception ex)
{
log.LogInformation($"Error In Job={ex}");
}
}
Upvotes: 0
Views: 3582
Reputation: 25994
When you configure Azure Function triggered by Azure Service Bus with maxAutoRenewDuration
set to 10 mins, you're asking the trigger to extend the lock up-to 10 minutes. This is not a guaranteed operation as it's initiated by the client-side and a maximum single locking period of time is 5 minutes. Given that, an operation to extend the lock can fail and the lock will be released, causing another instance of your function to process it concurrently, while the original processing is still happening.
Another aspect to look at is the prefetchCount
which is set to 100, and maxConcurrentCalls
that is set to 32. What that means is that you're fetching up-to 100 messages and process up to 32 that. I don't know if the actual Function code runs longer than 50 seconds (in your example), but prefetched message locks are not auto-renewed. Therefore, if the prefetched messages are not getting processed withing the queue's MaxLockDuration
time (which by default is less than 5 mins), some of those prefetched messages will start processing, optional renewal, and completion way after they've lost the lock.
I would recommend:
MaxLockDuration
not to be too short to accommodate your prefetch and concurrency.prefetchCount
to ensure you don't over-fetch.Upvotes: 2