VulgarBinary
VulgarBinary

Reputation: 3589

Azure ServiceBus OnMessage Blocking Call or Not?

We are utilizing Azure ServiceBus queues to process high volumes of client requests. However, the OnMessage call seems to be a blocking call, however the blocking on this call is incredibly inconsistent if it is indeed a blocking call.

What I am attempting to accomplish is watch a queue on a permanent basis from a web service application (to allow metrics to be mined from the running application)

I am creating the subscription below:

protected virtual void Subscribe(string queueName, Func<QueueRequest, bool> callback)
{
    var client = GetClient(queueName, PollingTimeout);
    var transformCallback = new Action<BrokeredMessage>((message) =>
    {
        try
        {
            var request = message.ToQueueRequest();
            if (callback(request))
            {
                message.Complete();
            }
            else
            {
                message.Abandon();
            }
        }
        catch (Exception ex)
        {
            //TODO: Log the error
            message.Abandon();
        }
    });
    var options = new OnMessageOptions
    {
        MaxConcurrentCalls = _config.GetInt("MaxThreadsPerQueue"),
        AutoComplete = false
    };
    options.ExceptionReceived += OnMessageError;
    client.OnMessage(transformCallback, options);
}

If I only have the subscription called once, the application stops watching the queue and thus stops processing the messages. However, if I place a while loop around my subscribe call. So with great hesitation I wrote the below snippet to resubscribe if this OnMessage completed.

protected void MonitorQueue()
{
    IsRunning = true;
    while (IsRunning)
    {
        try
        {
            Log.Info("MonitoringThread: OnMessage beginning logging for {0}", QueueName);
            QueueClient.Subscribe(QueueName, Processor);
            Log.Info("MonitoringThread:  OnMessage ended logging for {0}", QueueName);
        }
        catch (Exception ex)
        {
            IsRunning = false;
            Log.Error("MonitoringThread: Error in subscription for {0}: ", ex, QueueName);
        }

        if (SleepBeforeReinit > 0 && IsRunning)
        {
            Thread.Sleep(SleepBeforeReinit);
        }
    }
}

This fixed the problem of messages expiring to the dead letter due to not being picked up, however this caused other problems.

With OnMessage being a billable operation I am concerned when I see the log file telling me that a queue is beginning and ending less than a second apart and the size of my log file increases very rapidly.

I have my MessagingFactory set to have an OperationTimeout of 1 day, yet this does not seem to impact the frequency of the subscriptions open / close status as I would expect.

I have seen plenty of samples doing this as a worker role, however this will not accomplish what we are trying to do. I am currently wiring this from the Global.asax.cs of our web application. Any advice is greatly appreciated!

Upvotes: 3

Views: 1569

Answers (2)

VulgarBinary
VulgarBinary

Reputation: 3589

OnMessage and OnMessageAsync are NOT blocking calls. These need instantiated a single time and will continue subscribing to the queue until the application is terminated.

See related post for further details: Azure Service Bus, determine if OnMessage stops processing

Upvotes: 1

Dominic Betts
Dominic Betts

Reputation: 2331

Your first approach is correct, and the value of MaxThreadsPerQueue determines how many threads are available to handle your messages. It sounds as if these threads are being used up (maybe the processing of your messages by the callback is taking some time?).

You should instead consider using the QueueClient.OnMessageAsync Method to receive your messages.

Upvotes: 0

Related Questions