Reputation: 2899
I have a process that reads a message from an Azure Service Bus Queue and converts that message to a Video to be Encoded by Azure Media Services. I noticed that if the process is kicked off very quickly in a row, the same video was being encoded right after another. Here is my code that adds the Video to the Queue
public class VideoManager
{
string _connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
string _queueName = ConfigurationManager.AppSettings["ServiceBusQueueName"];
QueueClient _client;
public VideoManager()
{
var conStringBuilder = new ServiceBusConnectionStringBuilder(_connectionString)
{
OperationTimeout = TimeSpan.FromMinutes(120)
};
var messagingFactory = MessagingFactory.CreateFromConnectionString(conStringBuilder.ToString());
_client = messagingFactory.CreateQueueClient(_queueName);
}
public void Approve(Video video)
{
// Set video to approved.
video.ApprovalStatus = ApprovalStatus.Approved;
var message = new BrokeredMessage(new VideoMessage(video, VideoMessage.MessageTypes.Approve, string.Empty));
message.MessageId = video.RowKey;
_client.Send(message);
}
}
And the process that reads from the Queue
class Program
{
static QueueClient client;
static void Main(string[] args)
{
VideoManager videoManager = new VideoManager();
var connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
var conStringBuilder = new ServiceBusConnectionStringBuilder(connectionString)
{
OperationTimeout = TimeSpan.FromMinutes(120)
};
var messagingFactory = MessagingFactory.CreateFromConnectionString(conStringBuilder.ToString());
client = messagingFactory.CreateQueueClient(ConfigurationManager.AppSettings["ServiceBusQueueName"]);
Console.WriteLine("Starting: Broadcast Center Continuous Video Processing Job");
OnMessageOptions options = new OnMessageOptions
{
MaxConcurrentCalls = 25,
AutoComplete = false
};
client.OnMessageAsync(async message =>
{
bool shouldAbandon = false;
try
{
await HandleMessage(message);
}
catch (Exception ex)
{
shouldAbandon = true;
Console.WriteLine(ex.Message);
}
if (shouldAbandon)
{
await message.AbandonAsync();
}
}, options);
while (true) { }
}
async static Task<int> HandleMessage(BrokeredMessage message)
{
VideoMessage videoMessage = message.GetBody<VideoMessage>();
Console.WriteLine(String.Format("Message body: {0}", videoMessage.Video.Title));
Console.WriteLine(String.Format("Message id: {0}", message.MessageId));
VideoProcessingService vp = new VideoProcessingService(videoMessage.Video);
Task task;
switch (videoMessage.MessageType)
{
case VideoMessage.MessageTypes.CreateThumbnail:
task = new Task(() => vp.ProcessThumbnail(videoMessage.TimeStamp));
task.Start();
while (!task.IsCompleted)
{
await Task.Delay(15000);
message.RenewLock();
}
await task;
Console.WriteLine(task.Status.ToString());
Console.WriteLine("Processing Complete");
Console.WriteLine("Awaiting Message");
break;
case VideoMessage.MessageTypes.Approve:
task = new Task(() => vp.Approve());
task.Start();
while (!task.IsCompleted)
{
await Task.Delay(15000);
message.RenewLock();
}
await task;
Console.WriteLine(task.Status.ToString());
Console.WriteLine("Processing Complete");
Console.WriteLine("Awaiting Message");
break;
default:
break;
}
return 0;
}
}
What I see in the Console Window is the following if I kick off the process 3 times in a row
Message id: 76aca19a-0698-449b-bf58-a24876fc4314
Message id: 76aca19a-0698-449b-bf58-a24876fc4314
Message id: 76aca19a-0698-449b-bf58-a24876fc4314
I thought maybe I did not have the settings correct, but they are there I am really at a loss here, as I would expect this to be fairly out of the box behavior. Does duplicate detection only work if the message has been completed, so I can't use OnMessageAsync()?
Upvotes: 0
Views: 500
Reputation: 25994
The issue is not the completion (as it was in the code), but the fact that you have in essence multiple consumers (25 concurrent callbacks) and it seems like the LockDuration is elapsing faster than the processing takes. As a result of that, message re-appears and re-processed. As a result of that you see the same message ID logged more than once.
Possible solutions are (as I've outlined in a comment above):
BrokeredMessage.RenewLock
Upvotes: 2
Reputation: 415
There is a line of code missing from your HandleMessage code.
async static Task<int> HandleMessage(BrokeredMessage message)
{
VideoMessage videoMessage = message.GetBody<VideoMessage>();
message.CompleteAsync(); // This line...
Console.WriteLine(String.Format("Message id: {0}", message.MessageId));
// Processes Message
}
So yes you have to mark the message with either, Complete, Defer etc..
Also see this Answer, also found this which may be useful in how duplicate detection works
Upvotes: 1