piotrwest
piotrwest

Reputation: 2166

Use Parallel.ForEach on method returning task - avoid Task.WaitAll

I've got a method which takes IWorkItem, starts work on it and returns related task. The method has to look like this because of external library used.

public Task WorkOn(IWorkItem workItem)
{
    //...start asynchronous operation, return task
}

I want to do this work on multiple work items. I don't know how many of them will be there - maybe 1, maybe 10 000. WorkOn method has internal pooling and may involve waiting if too many pararell executions will be reached. (like in SemaphoreSlim.Wait):

public Task WorkOn(IWorkItem workItem)
{
    _semaphoreSlim.Wait();
}

My current solution is:

public void Do(params IWorkItem[] workItems)
{
    var tasks = new Task[workItems.Length];
    for (var i = 0; i < workItems.Length; i++)
    {
        tasks[i] = WorkOn(workItems[i]);
    }
    Task.WaitAll(tasks);
}

Question: may I use somehow Parallel.ForEach in this case? To avoid creating 10000 tasks and later wait because of WorkOn's throttling?

Upvotes: 0

Views: 1079

Answers (1)

Nitram
Nitram

Reputation: 6716

That actually is not that easy. You can use Parallel.ForEach to throttle the amount of tasks that are spawned. But I am unsure how that will perform/behave in your condition.

As a general rule of thumb I usually try to avoid mixing Task and Parallel.

Surely you can do something like this:

public void Do(params IWorkItem[] workItems)
{
    Parallel.ForEach(workItems, (workItem) => WorkOn(workItem).Wait());
}

Under "normal" conditions this should limit your concurrency nicely.

You could also go full async-await and add some limiting to your concurrency with some tricks. But you have to do the concurrency limiting yourself in that case.

const int ConcurrencyLimit = 8;
public async Task Do(params IWorkItem[] workItems)
{
    var cursor = 0;
    var currentlyProcessing = new List<Task>(ConcurrencyLimit);
    while (cursor < workItems.Length)
    {
        while (currentlyProcessing.Count < ConcurrencyLimit && cursor < workItems.Length)
        {
            currentlyProcessing.Add(WorkOn(workItems[cursor]));
            cursor++;
        }

        Task finished = await Task.WhenAny(currentlyProcessing);
        currentlyProcessing.Remove(finished);
    }

    await Task.WhenAll(currentlyProcessing);
}

As I said... a lot more complicated. But it will limit the concurrency to any value you apply as well. In addition it properly uses the async-await pattern. If you don't want non-blocking multi threading you can easily wrap this function into another function and do a blocking .Wait on the task returned by this function.

In key in this implementation is the Task.WhenAny function. This function will return one finished task in the applied list of task (wrapped by another task for the await.

Upvotes: 2

Related Questions