Dead.Rabit
Dead.Rabit

Reputation: 1975

Run Web Job in parallel

We have a series of 4 Service Bus queues, each queue has a web job that processes messages and passes it on to the next queue. Though we're running on a single core, each webjob is async and allows the other jobs to continue while it queries a database or endpoint.

we have set MaxConcurrentCalls = 3 in the ServiceBusConfiguration

However, now all the messages are in the final queue, it's not spinning up multiple instances of the final Web Job to process them faster and instead executing synchronously. I'd like to know how to configure my Web Jobs to run the same web job in parallel.

I notice this article from 2014 which suggests we have to implement our own parallel processing but more recent articles contradict this information saying it is supported OOTB.

Upvotes: 0

Views: 3122

Answers (2)

Dead.Rabit
Dead.Rabit

Reputation: 1975

By setting ServiceBusConfiguration.PrefetchCount and ServiceBusConfiguration.MessageOptions.MaxConcurrentCalls, I have been able to see that a single webjob will dequeue multiple messages and process them in parallel.

Upvotes: 1

Joey Cai
Joey Cai

Reputation: 20127

Only for Continuous WebJobs is available to scale out multi instances. Which is determining whether the program or script runs on all instances or just one instance.

The option to run on multiple instances doesn't apply to the free or shared price tiers.

In your webjob, you will find an instance of the JobHostConfiguration object. This object is used to configure the properties of your webjob.

Here is a configuration:

static void Main()
{
    var config = new JobHostConfiguration();
    config.UseTimers();
    config.Queues.MaxDequeueCount = 2;
    config.Queues.MaxPollingInterval = TimeSpan.FromSeconds(4);
    config.Queues.BatchSize = 2;
    var host = new JobHost(config);
    host.RunAndBlock();
}

So lets break down the items into pieces:

config.UseTimers();

The config.UserTimers(); allows us to use a timer trigger in our functions.

config.Queues.MaxDequeueCount = 2;

The MaxDequeueCount is the number of times your function will try process a message if it errors out.

config.Queues.MaxPollingInterval = TimeSpan.FromSeconds(4);

MaxPollingInterval is the max amount of time the WebJob will check the queue. If of this is not desirable you can change this setting like I have above so that the WebJob will check the queue maximum every 4 seconds.

config.Queues.BatchSize = 2;

The BatchSize property is the amount of items your WebJob will process at the same time. The items will be processed asynchronously.

So if there is 2 items in the queue they will be processed parallel. If you set this one to 1 then you are creating a Synchronous flow as it will only take one item out of the queue at a time.

For more detail, you could refer to this article to learn run webjob in parallel.

Update:

The method BeginReceiveBatch/EndReceiveBatch allows you to retrieve multiple "items" from Queue (Async) and then use AsParallel to convert the IEnumerable returned by the previous methods and process the messages in multiple threads.

var messages = await Task.Factory.FromAsync<IEnumerable<BrokeredMessage>>(Client.BeginReceiveBatch(3, null, null), Client.EndReceiveBatch);

messages.AsParallel().WithDegreeOfParallelism(3).ForAll(item =>
{
    ProcessMessage(item);
});

That code retrieves 3 messages from queue and processes then in "3 threads" (Note: it is not guaranteed that it will use 3 threads, .NET will analyze the system resources and it will use up to 3 threads if necessary.)

For more details, you could refer to this case.

Upvotes: 2

Related Questions