olatunjee
olatunjee

Reputation: 361

Processing Messages in Parallel in Azure Service Bus

Problem: I've got tons of emails to send, presently, an average of 10 emails in the queue at any point in time. The code I have process the queue one at a time; that is, receive the message, process it and eventually send the email. This cause a considerably delay in sending emails to users when they signup for the service.

I've begun to think of modifying the code to process the messages in parrallel say 5 asynchronously. I'm imagining writing a method and using the CTP to call this method in parallel, say, 5 times.

I'm a little bit lost in how to implement this. The cost of making a mistake is exceedingly great as users will get disappointed if things go wrong.

Request: I need help in writing code that process messages in Azure service bus in parallel. Thanks.

My code in a nutshell.

Public .. Run()
{
   _myQueueClient.BeginReceive(ProcessUrgentEmails, _myQueueClient);
}

void ProcessUrgentEmails(IAsyncResult result)
{
   //casted the `result` as a QueueClient
   //Used EndReceive on an object of BrokeredMessage
   //I processed the message, then called
   sendEmail.BeginComplete(ProcessEndComplete, sendEmail);
 }


 //This method is never called despite having it as callback function above.
 void ProcessEndComplete(IAsyncResult result)
 {
     Trace.WriteLine("ENTERED ProcessEndComplete method...");
     var bm = result.AsyncState as BrokeredMessage;
     bm.EndComplete(result); 
 }

Upvotes: 5

Views: 17074

Answers (1)

Daniel
Daniel

Reputation: 1484

This page gives you performance tips when using Windows Azure Service Bus.

About parallel processing, you could have a pool of threads for processing, and every time you get a message, you just grab one of that pool and assign it a message. You need to manage that pool.

OR, you could retrieve multiple messages at once and process them using TPL... for example, the method BeginReceiveBatch/EndReceiveBatch allows you to retrieve multiple "items" from Queue (Async) and then use "AsParallel" to convert the IEnumerable returned by the previous methods and process the messages in multiple threads.

VERY simple and BARE BONES sample:

var messages = await Task.Factory.FromAsync<IEnumerable<BrokeredMessage>>(Client.BeginReceiveBatch(3, null, null), Client.EndReceiveBatch);

messages.AsParallel().WithDegreeOfParallelism(3).ForAll(item =>
{
    ProcessMessage(item);
});

That code retrieves 3 messages from queue and processes then in "3 threads" (Note: it is not guaranteed that it will use 3 threads, .NET will analyze the system resources and it will use up to 3 threads if necessary)

You could also remove the "WithDegreeOfParallelism" part and .NET will use whatever threads it needs.

At the end of the day there are multiple ways to do it, you have to decide which one works better for you.

UPDATE: Sample without using ASYNC/AWAIT

This is a basic (without error checking) sample using regular Begin/End Async pattern.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Threading;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.ServiceRuntime;

namespace WorkerRoleWithSBQueue1
{
    public class WorkerRole : RoleEntryPoint
    {
        // The name of your queue
        const string QueueName = "QUEUE_NAME";
        const int MaxThreads = 3;

        // QueueClient is thread-safe. Recommended that you cache 
        // rather than recreating it on every request
        QueueClient Client;
        bool IsStopped;
        int dequeueRequests = 0;

        public override void Run()
        {
            while (!IsStopped)
            {
                // Increment Request Counter
                Interlocked.Increment(ref dequeueRequests);

                Trace.WriteLine(dequeueRequests + " request(s) in progress");

                Client.BeginReceive(new TimeSpan(0, 0, 10), ProcessUrgentEmails, Client);

                // If we have made too many requests, wait for them to finish before requesting again.
                while (dequeueRequests >= MaxThreads && !IsStopped)
                {
                    System.Diagnostics.Trace.WriteLine(dequeueRequests + " requests in progress, waiting before requesting more work");
                    Thread.Sleep(2000);
                }

            }
        }


        void ProcessUrgentEmails(IAsyncResult result)
        {
            var qc = result.AsyncState as QueueClient;
            var sendEmail = qc.EndReceive(result);
            // We have received a message or has timeout... either way we decrease our counter
            Interlocked.Decrement(ref dequeueRequests);

            // If we have a message, process it
            if (sendEmail != null)
            {
                var r = new Random();
                // Process the message
                Trace.WriteLine("Processing message: " + sendEmail.MessageId);
                System.Threading.Thread.Sleep(r.Next(10000));

                // Mark it as completed
                sendEmail.BeginComplete(ProcessEndComplete, sendEmail);
            }

        }


        void ProcessEndComplete(IAsyncResult result)
        {
            var bm = result.AsyncState as BrokeredMessage;
            bm.EndComplete(result);
            Trace.WriteLine("Completed message: " + bm.MessageId);
        }


        public override bool OnStart()
        {
            // Set the maximum number of concurrent connections 
            ServicePointManager.DefaultConnectionLimit = 12;

            // Create the queue if it does not exist already
            string connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
            var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
            if (!namespaceManager.QueueExists(QueueName))
            {
                namespaceManager.CreateQueue(QueueName);
            }

            // Initialize the connection to Service Bus Queue
            Client = QueueClient.CreateFromConnectionString(connectionString, QueueName);
            IsStopped = false;
            return base.OnStart();
        }

        public override void OnStop()
        {
            // Waiting for all requestes to finish (or timeout) before closing
            while (dequeueRequests > 0)
            {
                System.Diagnostics.Trace.WriteLine(dequeueRequests + " request(s), waiting before stopping");
                Thread.Sleep(2000);
            }
            // Close the connection to Service Bus Queue
            IsStopped = true;
            Client.Close();
            base.OnStop();
        }
    }
}

Hope it helps.

Upvotes: 7

Related Questions