keshav
keshav

Reputation: 41

Parallel processing using concurrent collection

I currently have a function that perform a set of 10 tasks in parallel. After the 10 tasks completes i move on to the next 10 until my queue is empty. I am looking forward to increase the efficiency of that algorithm as right now if 9 of my tasks have completed in 1 min and my 10th task is taking another 10 min i need to wait for all the 10 task to complete even though i have 9 spot free for 9 other task to start using.

Is there a way that when a task is completed, i immediately send another task for processing within that same level(for each loop). I saw that concurrent Dictionary can be use. Can you please guide and provide some sample code.

public async Task Test()
{

List<task> listoftasks =new List<Task>();

foreach(level in levels)
{
    Queue<Model1> queue=new Queue<Model1>(Store);
   
   while(queue.Count>0)
   {
        
        for(int i=0;i<10;i++)
        {
            if(!queue.TryDequeue(out Model1 item))
            {
                break;
            }
            listoftasks.Add(Task.Run(()=>Dosomething(sql)))
        }

        await Task.WhenAll(listoftasks);
        listoftasks .Clear();    
    }
 }
}

Upvotes: 0

Views: 1398

Answers (3)

TheGeneral
TheGeneral

Reputation: 81583

Personally I'd use an ActionBlock (out of the TPL Dataflow library). It has

  1. built in MaxDegreeOfParallelism
  2. Can easily deal with async IO Bound workloads, or non async CPU Bound workloads
  3. Has cancelation support (if needed)
  4. Can be built into larger pipelines
  5. Can run as perpetual consumer in a multi-producer environment

Given

private ActionBlock<Model> _processor;

Setup

_processor = new ActionBlock<Model>(
   DoSomething,
   new ExecutionDataflowBlockOptions()
   {
      CancellationToken = SomeCancelationTokenIfNeeded,
      MaxDegreeOfParallelism = 10,
      SingleProducerConstrained = true
   });

Some Method

public static void DoSomething(Model item)
{ ... }

Usage

await _processor.SendAsync(someItem);

Upvotes: 1

Charlieface
Charlieface

Reputation: 72480

You can get each Task to dequeue an item. Use a ConcurrentQueue to ensure thread-safety.

It's kind of a poor-man's scheduler, but it's very lightweight.

ConcurrentQueue<Model1> queue;

void Dequeue()
{
   while(queue.TryDequeue(out var item))
       DoSomething(item);
}

public async Task Test()
{
    queue = new ConcurrentQueue<Model1>(Store);
    var listoftasks = new List<Task>();
    for (var i = 0; i < Environment.ProcessorCount; i++)
        listoftasks.Add(Task.Run(() => Dequeue()));
    
    await Task.WhenAll(listoftasks);
}

Note: this does not handle exceptions, so all exceptions must be handled or swallowed

Upvotes: 1

Serg
Serg

Reputation: 4696

You can just use LimitedConcurrencyLevelTaskScheduler to achieve desired behavour (https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskscheduler?view=net-5.0). In this case you can just push all tasks at one moment and they will be executed with desired level of concurrency (not more then 10 tasks at the parallel in your case).

Upvotes: 2

Related Questions