Mark Allison
Mark Allison

Reputation: 7228

How to spawn a number of threads async to call a method multiple times in C#?

I need to call a worker method multiple times to load data into a database. I want to do some parallelism with this and be able to specify the number of threads to use. I thought of using the mod operator to split the workload, but getting stuck on how to implement with async await.

So the async method must create n number of threads and then call the worker method so there are n streams of work happening in parallel. The worker method is synchronous.

I had a go at it, but quite sure how to implement what I want. Is there a pattern for this?

Some code I was playing around with:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace TestingAsync
{
    class Program
    {
        static void Main(string[] args)
        {
            int threads = 3;
            int numItems = 10;
            Task task = ThreadIt(threads, numItems);
        }

        static async Task ThreadIt(int threads, int numItems)
        {
            Console.WriteLine($"Item limit: {numItems}, threads: {threads}");
            for (int i = 0; i < numItems; i++)
            {
                Console.Write($"Adding item: {i} mod 1: {i % threads}. ");
                int task = await DoSomeWork(i%threads, 500);
            }
        }

        static async Task<int> DoSomeWork(int Item, int time)
        {
            Console.Write($"    Working.on item {Item}..");
            Thread.Sleep(time );
            Console.WriteLine($"done.");
            return Item;
        }
    }
}

EDIT:

I'm going to rephrase because maybe I wasn't clear in my requirements.

What I want is to create n number of threads. There will be x number of items to process and I want them to be queued up using mod (or something else) and then processed in order in parallel across the n threads. When one item has finished, I want the next item to be processed immediately and not wait for all three threads to finish. Some items will take longer to process than others, maybe even up to 10 times longer, so other threads should not be waiting for one of the threads to complete.

For example if we have 3 threads and 9 items, this would happen:

thread1: items 0,3,6 
thread2: items 1,4,7
thread3: items 2,5,8

each thread processes it's workload in order and does not wait in between each item.

Upvotes: 0

Views: 357

Answers (2)

Stephen Cleary
Stephen Cleary

Reputation: 456457

I need to call a worker method multiple times to load data into a database. I want to do some parallelism with this and be able to specify the number of threads to use... The worker method is synchronous... Is there a pattern for this?

Yes, the Task Parallel Library.

Given:

static int DoSomeWork(int Item, int time)
{
  Console.Write($"    Working.on item {Item}..");
  Thread.Sleep(time);
  Console.WriteLine($"done.");
  return Item;
}

You can parallelize it as such:

static List<int> ThreadIt(int threads, int numItems)
{
  Console.WriteLine($"Item limit: {numItems}, threads: {threads}");
  var items = Enumerable.Range(0, numItems);
  return items.AsParallel().WithDegreeOfParallelism(threads)
      .Select(i => DoSomeWork(i, 500))
      .ToList();
}

Upvotes: 1

Ehsan Sajjad
Ehsan Sajjad

Reputation: 62488

You can try creating a List<Task<T>> and start them and then await it with WhenAll if you want all tasks to be completed or WhenAny if any of them completes:

static async Task ThreadIt(int threads, int numItems)
{
      List<Task<int>> tasks = new List<Task<int>>();
      Console.WriteLine($"Item limit: {numItems}, threads: {threads}");
      for (int i = 0; i < numItems; i++)
      {
           Console.Write($"Adding item: {i} mod 1: {i % threads}. ");
           tasks.Add(DoSomeWork(i%threads, 500));
      }

       var result = await Task.WhenAll(tasks);
}

and when using Task, async and await we should be using Task.Delay instead of Thread.Sleep:

static async Task<int> DoSomeWork(int Item, int time)
{
      Console.Write($"    Working.on item {Item}..");
      await Task.Delay(time); // note this 
      Console.WriteLine($"done.");
      return Item;
}

EDIT:

You can create a ConcurrentQueue and then dequeue each time when 3 Tasks complete and generate next 3 like:

static async Task ThreadIt(int threads, int numItems)
{
      ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
      Enumerable.Range(0, 10).ForEach(x => queue.Enqueue(x));
      List<Task<int>> tasks = new List<Task<int>>();
      Console.WriteLine($"Item limit: {numItems}, threads: {threads}");
      while (!queue.IsEmpty)
      {
          for (int i = 0; i < threads; i++)
          {
              if(queue.TryDequeue(out int val))
              {
                   Console.Write($"Adding item: {val} mod 1: {val % threads}. ");
                   tasks.Add(DoSomeWork(val%threads, 500));
              }
          }

       var result = await Task.WhenAll(tasks);
     }
}

Upvotes: 2

Related Questions