Keaneum
Keaneum

Reputation: 33

How can I await an enumerable of tasks and stop when a number of tasks have completed?

I have an array of tasks running identical jobs, but using different parameters on different servers. It could occur that one of the servers is unresponsive/slow resulting in a situation where all tasks have completed but one. At this moment I'm awaiting them using the Task.WhenAll(), so there is no option but to wait until my timeout expires.

In the ideal case all tasks complete within the timeout and I can gather all the results, but in the other case, basically I want to wait:

If by the time that n tasks have been completed and we have waited for another x minutes, not all tasks have completed, I want to retrieve the result of the finished tasks.

Is there any way I can achieve the above?

Upvotes: 1

Views: 824

Answers (3)

Jesse de Wit
Jesse de Wit

Reputation: 4177

Even when you have complex logic for cancellation, you want to cancel the underlying tasks. If the underlying tasks are cancelled at the right time, you can use Task.WhenAll in any case.

So breaking down your question, what you're asking is, 'How can I cancel tasks based on the state of other tasks?'. You need to keep a state of number of completed tasks and cancel your tasks based on that state.

If you need to do 'stuff' when tasks complete (like update the state of how many tasks completed), I find continuations to be helpful and quite a clean solution. Example of your usecase:

// n from your question
var n = 4; 

// number of tasks currently completed
var tasksCompleted = 0; 

// The list of tasks (note it's the continuations in this case)
// You can also keep the continuations and actual tasks in separate lists.
var tasks = new List<Task>();

// delay before cancellation after n tasks completed
var timeAfterNCompleted = TimeSpan.FromMinutes(x); 
using var cts = new CancellationTokenSource();

for (int i = 0; i < 10; i++)
{
    // Do your work with a passed cancellationtoken you control
    var currentTask = DoWorkAsync(i, cts.Token);

    // Continuation will update the state of completed tasks
    currentTask = currentTask.ContinueWith((t) => 
    {
        if (t.IsCompletedSuccessfully)
        {
            var number = Interlocked.Increment(ref tasksCompleted);
            if (number == n)
            {
                // If we passed n tasks completed successfully,
                // We'll cancel after the grace period
                // Note that this will actually cancel the underlying tasks
                // Because we passed the token to the DoWorkAsync method
                cts.CancelAfter(timeAfterNCompleted);
            }
        }
    });
    tasks.Add(currentTask);
}

await Task.WhenAll(tasks);

// All your tasks have either completed or cancelled here
// Note that in this specific example all tasks will appear
// to have run to completion. That's because we're looking at
// the continuations here. Store continuation and actual task
// in separate lists and you can retrieve the results.
// (Make sure you await the continuations though)

Upvotes: 3

Aron
Aron

Reputation: 15772

Rx.Net is the most elegant way to achieve this.

public IAsyncEnumerable<TResult> DoStuff<TResult>(IEnumerable<Func<CancellationToken, Task<TResult>>> tasks)
{
    var inputs = tasks
            // convert this into IObservable<TResult>
            // this type, like IAsyncEnumerable, contains
            // async logic, and cancellation...
            .ToObservable()
            .Select(task => Observable.FromAsync(task))
            .Merge()
            // publish/refcount is needed to ensure
            // we only run the tasks once, and share
            // the "result/event".
            .Publish()
            .RefCount();
                         // On the 100th Item
    var timeoutSignal = inputs.Skip(100 - 1)
                          .Take(1)
                          // Generate a signal 10 minutes after the 100th 
                          // item arrives
                          .Delay(TimeSpan.FromMinutes(10));
    return inputs
            // Take items until the timeout signal
            .TakeUntil(timeoutSignal)
            .ToAsyncEnumerable();
    
}

var items = await DoStuff(tasks).ToListAsync()

Upvotes: 1

thewallrus
thewallrus

Reputation: 725

Use Task.WhenAny to know if any tasks completes, then remove that completed task from your array.

stopWatch.Start();
while (arrayoftasks.Any())
{
    Task<string> finishedTask = await Task.WhenAny(arrayOfTasks);
    arrayOfTasks.Remove(finishedTask);
    await finishedTask;
    finishedCount++;
    if (finishedCount == 4) //check you stopwatch elapsed here.
    {
        Console.WriteLine("4 tasks have finished");
    }
}

Working example:

using System.Diagnostics;
using System.Security.Cryptography;

await Test.Go();
Console.ReadLine();
public static class Test
{
    public static async Task Go()
    {
        List<Task<string>> arrayOfTasks = GetArrayOfTasks();
        int finishedCount = 0;
        Stopwatch stopWatch = new Stopwatch();
        stopWatch.Start();
        while (arrayOfTasks.Any())
        {
            Task<string> finishedTask = await Task.WhenAny(arrayOfTasks);
            arrayOfTasks.Remove(finishedTask);
            Console.WriteLine(await finishedTask);
            finishedCount++;
            if (finishedCount == 4) //check you stopwatch elapsed here too
            {
                Console.WriteLine($":::{finishedCount} tasks have finished, {arrayOfTasks.Count} to go");
            }
        }
    }

    private static List<Task<string>> GetArrayOfTasks()
    {
        List<Task<string>> taskList = new();
        for (int i = 0; i < 10; i++)
        {
            var t = GetString(i);
            taskList.Add(t);
        }
        return taskList;
    }

    private static async Task<string> GetString(int i)
    {
        await Task.Delay(RandomNumberGenerator.GetInt32(1, 5000));
        return i.ToString();
    }
}   

Upvotes: 0

Related Questions