KingOfHypocrites
KingOfHypocrites

Reputation: 9547

How to Bulk Process Messages from Service Bus Queue Every X Seconds

My service bus queue previously processed one message at a time. I wanted to change it to allow the messages to queue up so I could process them in bulk every X seconds. I initially did this:

var messagingFactorySettings = new MessagingFactorySettings
{
    NetMessagingTransportSettings = { 
        BatchFlushInterval = TimeSpan.FromMilliseconds(10000) },
        TokenProvider = credentials
};

Now I want to receive the messages, but seems I have to do this infinite while loop to get at them:

while ((messages = myQueueClient.ReceiveBatch(1000).ToList()) != null)
{
    foreach (var message in messages)
    { ...

It was my understanding that the BatchFlushInterval would allow the requests to build up for X time so that I am not receiving the messages one by one but rather in bulk. Therefore I'm not sure why I can't do something like I did previously:

myQueueClient.OnMessage((m) =>
{

But a bulk version:

myQueueClient.OnBulkMessage((listOfMessages) =>
{

Am I missing something or is constantly polling the only way to accomplish this? Also my BatchFlushInterval seems to be ignored. I would expected that it would only check for new messages every 10 seconds but it receives the first batch immediately and any new messages that come in get processed immediately as well.

Assuming I want to every X (i.e: 1) seconds pull up to Y messages (i.e.: 1000) off the queue and process them at once how would I do this? Why is BatchFlushInterval not having any affect?

Upvotes: 1

Views: 2609

Answers (1)

glenebob
glenebob

Reputation: 1983

It would seem that a simple Thread.Sleep(x) is in order.

I found the problem of pausing until some total time has elapsed interesting, so I went off and implemented a stopwatch subclass that makes that problem solvable in a cleaner, more readable way.

while ((var messages = myQueueClient.ReceiveBatch(1000)) != null)
{
    var sw = WaitableStopwatch.StartNew();

    // ReceiveBatch() return IEnumerable<>. No need for .ToList().
    foreach (var message in messages)
    {
        ...
    }

    // If processing took less than 10 seconds, sleep
    // for the remainder of that time span before getting
    // the next batch.
    sw.Wait(Timespan.FromSeconds(10));
}



/// <summary>
/// Extends Stopwatch with the ability to wait until a specified
/// elapsed time has been reached.
/// </summary>
public class WaitableStopwatch : Stopwatch
{
    /// <summary>
    /// Initializes a new WaitableStopwatch instance, sets the elapsed
    /// time property to zero, and starts measuring elapsed time.
    /// </summary>
    /// <returns>A WaitableStopwatch that has just begun measuring elapsed time.</returns>
    public static new WaitableStopwatch StartNew()
    {
        WaitableStopwatch sw = new WaitableStopwatch();

        sw.Start();

        return sw;
    }

    /// <summary>
    /// Waits until the ElapsedMilliseconds property reaches <paramref name="elapsedMilliseconds"/>.
    /// </summary>
    /// <param name="elapsedMilliseconds"></param>
    public void Wait(int elapsedMilliseconds)
    {
        Wait(TimeSpan.FromMilliseconds(elapsedMilliseconds));
    }

    /// <summary>
    /// Waits until when the Elapsed property reaches <paramref name="elapsed"/>.
    /// </summary>
    /// <param name="elapsed"></param>
    public void Wait(TimeSpan elapsed)
    {
        TimeSpan diff;

        while ((diff = elapsed - this.Elapsed) > TimeSpan.Zero)
        {
            Thread.Sleep(diff);
        }
    }

    /// <summary>
    /// Waits until when the ElapsedMilliseconds property reaches <paramref name="elapsedMilliseconds"/>.
    /// </summary>
    /// <param name="elapsedMilliseconds"></param>
    public Task WaitAsync(int elapsedMilliseconds)
    {
        return WaitAsync(TimeSpan.FromMilliseconds(elapsedMilliseconds));
    }

    /// <summary>
    /// Waits until when the Elapsed property reaches <paramref name="elapsed"/>.
    /// </summary>
    /// <param name="elapsed"></param>
    public async Task WaitAsync(TimeSpan elapsed)
    {
        TimeSpan diff;

        while ((diff = elapsed - this.Elapsed) > TimeSpan.Zero)
        {
            await Task.Delay(diff);
        }
    }
}

Upvotes: 1

Related Questions