Reputation: 1736
Is there a way in the new async dotnet 4.5 library to set a timeout on the Task.WhenAll
method? I want to fetch several sources, and stop after say 5 seconds, and skip the sources that weren't finished.
Upvotes: 77
Views: 35960
Reputation: 43996
I tried to improve on the excellent i3arnon's solution, in order to fix some minor issues, but I ended up with a completely different implementation. The two issues that I tried to solve are:
timeout
.
Leaking an active Task.Delay
might result in a non-negligible amount of leaked memory, in case the WhenAll
is called in a loop, and the timeout
is large.On top of that I added a cancellationToken
argument, XML documentation that explains what this method is doing, and argument validation. Here it is:
/// <summary>
/// Returns a task that will complete when all of the tasks have completed,
/// or when the timeout has elapsed, or when the token is canceled, whatever
/// comes first. In case the tasks complete first, the task contains the
/// results/exceptions of all the tasks. In case the timeout elapsed first,
/// the task contains the results/exceptions of the completed tasks only.
/// In case the token is canceled first, the task is canceled. To determine
/// whether a timeout has occured, compare the number of the results with
/// the number of the tasks.
/// </summary>
public static Task<TResult[]> WhenAll<TResult>(
Task<TResult>[] tasks,
TimeSpan timeout, CancellationToken cancellationToken = default)
{
if (tasks == null) throw new ArgumentNullException(nameof(tasks));
tasks = tasks.ToArray(); // Defensive copy
if (tasks.Any(t => t == null)) throw new ArgumentException(
$"The {nameof(tasks)} argument included a null value.", nameof(tasks));
if (timeout < TimeSpan.Zero && timeout != Timeout.InfiniteTimeSpan)
throw new ArgumentOutOfRangeException(nameof(timeout));
if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled<TResult[]>(cancellationToken);
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(timeout);
var continuationOptions = TaskContinuationOptions.DenyChildAttach |
TaskContinuationOptions.ExecuteSynchronously;
var continuations = tasks.Select(task => task.ContinueWith(_ => { },
cts.Token, continuationOptions, TaskScheduler.Default));
return Task.WhenAll(continuations).ContinueWith(allContinuations =>
{
cts.Dispose();
if (allContinuations.IsCompletedSuccessfully)
return Task.WhenAll(tasks); // No timeout or cancellation occurred
Debug.Assert(allContinuations.IsCanceled);
if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled<TResult[]>(cancellationToken);
// Now we know that timeout has occurred
return Task.WhenAll(tasks.Where(task => task.IsCompleted));
}, default, continuationOptions, TaskScheduler.Default).Unwrap();
}
This WhenAll
implementation elides async and await, which is not advisable in general. In this case it is necessary, in order to propagate all the errors in a not nested AggregateException
. The intention is to simulate the behavior of the built-in Task.WhenAll
method as accurately as possible.
Usage example:
string[] results;
Task<string[]> whenAllTask = WhenAll(tasks, TimeSpan.FromSeconds(15));
try
{
results = await whenAllTask;
}
catch when (whenAllTask.IsFaulted) // It might also be canceled
{
// Log all errors
foreach (var innerEx in whenAllTask.Exception.InnerExceptions)
{
_logger.LogError(innerEx, innerEx.Message);
}
throw; // Propagate the error of the first failed task
}
if (results.Length < tasks.Length) throw new TimeoutException();
return results;
Note: the above API has a design flaw. In case at least one of the tasks has failed or has been canceled, there is no way to determine whether a timeout has occurred. The Exception.InnerExceptions
property of the task returned by the WhenAll
may contain the exceptions of all tasks, or part of the tasks, and there is no way to say which is which. Unfortunately I can't think of a solution to this problem.
Upvotes: 0
Reputation: 18474
Check out the "Early Bailout" and "Task.Delay" sections from Microsoft's Consuming the Task-based Asynchronous Pattern.
Early bailout. An operation represented by t1 can be grouped in a WhenAny with another task t2, and we can wait on the WhenAny task. t2 could represent a timeout, or cancellation, or some other signal that will cause the WhenAny task to complete prior to t1 completing.
Upvotes: 9
Reputation: 938
You can use the following code:
var timeoutTime = 10;
var tasksResult = await Task.WhenAll(
listOfTasks.Select(x => Task.WhenAny(
x, Task.Delay(TimeSpan.FromMinutes(timeoutTime)))
)
);
var succeededtasksResponses = tasksResult
.OfType<Task<MyResult>>()
.Select(task => task.Result);
if (succeededtasksResponses.Count() != listOfTasks.Count())
{
// Not all tasks were completed
// Throw error or do whatever you want
}
//You can use the succeededtasksResponses that contains the list of successful responses
How it works:
You need to put in the timeoutTime variable the limit of time for all tasks to be completed. So basically all tasks will wait in maximum the time that you set in timeoutTime. When all the tasks return the result, the timeout will not occur and the tasksResult will be set.
After that we are only getting the completed tasks. The tasks that were not completed will have no results.
Upvotes: 0
Reputation: 3704
void result version of @i3arnon 's answer, along with comments and changing first argument to use extension this.
I've also got a forwarding method specifying timeout as an int using TimeSpan.FromMilliseconds(millisecondsTimeout)
to match other Task methods.
public static async Task WhenAll(this IEnumerable<Task> tasks, TimeSpan timeout)
{
// Create a timeout task.
var timeoutTask = Task.Delay(timeout);
// Get the completed tasks made up of...
var completedTasks =
(
// ...all tasks specified
await Task.WhenAll(tasks
// Now finish when its task has finished or the timeout task finishes
.Select(task => Task.WhenAny(task, timeoutTask)))
)
// ...but not the timeout task
.Where(task => task != timeoutTask);
// And wait for the internal WhenAll to complete.
await Task.WhenAll(completedTasks);
}
Upvotes: 1
Reputation: 1937
In addition to timeout, I also check the cancellation which is useful if you are building a web app.
public static async Task WhenAll(
IEnumerable<Task> tasks,
int millisecondsTimeOut,
CancellationToken cancellationToken)
{
using(Task timeoutTask = Task.Delay(millisecondsTimeOut))
using(Task cancellationMonitorTask = Task.Delay(-1, cancellationToken))
{
Task completedTask = await Task.WhenAny(
Task.WhenAll(tasks),
timeoutTask,
cancellationMonitorTask
);
if (completedTask == timeoutTask)
{
throw new TimeoutException();
}
if (completedTask == cancellationMonitorTask)
{
throw new OperationCanceledException();
}
await completedTask;
}
}
Upvotes: 2
Reputation: 116666
I think a clearer, more robust option that also does exception handling right would be to use Task.WhenAny
on each task together with a timeout task, go through all the completed tasks and filter out the timeout ones, and use await Task.WhenAll()
instead of Task.Result
to gather all the results.
Here's a complete working solution:
static async Task<TResult[]> WhenAll<TResult>(IEnumerable<Task<TResult>> tasks, TimeSpan timeout)
{
var timeoutTask = Task.Delay(timeout).ContinueWith(_ => default(TResult));
var completedTasks =
(await Task.WhenAll(tasks.Select(task => Task.WhenAny(task, timeoutTask)))).
Where(task => task != timeoutTask);
return await Task.WhenAll(completedTasks);
}
Upvotes: 29
Reputation: 245066
You could combine the resulting Task
with a Task.Delay()
using Task.WhenAny()
:
await Task.WhenAny(Task.WhenAll(tasks), Task.Delay(timeout));
If you want to harvest completed tasks in case of a timeout:
var completedResults =
tasks
.Where(t => t.Status == TaskStatus.RanToCompletion)
.Select(t => t.Result)
.ToList();
Upvotes: 129
Reputation: 1527
What you describe seems like a very common demand however I could not find anywhere an example of this. And I searched a lot... I finally created the following:
TimeSpan timeout = TimeSpan.FromSeconds(5.0);
Task<Task>[] tasksOfTasks =
{
Task.WhenAny(SomeTaskAsync("a"), Task.Delay(timeout)),
Task.WhenAny(SomeTaskAsync("b"), Task.Delay(timeout)),
Task.WhenAny(SomeTaskAsync("c"), Task.Delay(timeout))
};
Task[] completedTasks = await Task.WhenAll(tasksOfTasks);
List<MyResult> = completedTasks.OfType<Task<MyResult>>().Select(task => task.Result).ToList();
I assume here a method SomeTaskAsync that returns Task<MyResult>.
From the members of completedTasks, only tasks of type MyResult are our own tasks that managed to beat the clock. Task.Delay returns a different type. This requires some compromise on typing, but still works beautifully and quite simple.
(The array can of course be built dynamically using a query + ToArray).
Upvotes: 2
Reputation: 5264
In addition to svick's answer, the following works for me when I have to wait for a couple of tasks to complete but have to process something else while I'm waiting:
Task[] TasksToWaitFor = //Your tasks
TimeSpan Timeout = TimeSpan.FromSeconds( 30 );
while( true )
{
await Task.WhenAny( Task.WhenAll( TasksToWaitFor ), Task.Delay( Timeout ) );
if( TasksToWaitFor.All( a => a.IsCompleted ) )
break;
//Do something else here
}
Upvotes: 0
Reputation: 3504
Check out a custom task combinator proposed in http://tutorials.csharp-online.net/Task_Combinators
async static Task<TResult> WithTimeout<TResult>
(this Task<TResult> task, TimeSpan timeout)
{
Task winner = await (Task.WhenAny
(task, Task.Delay (timeout)));
if (winner != task) throw new TimeoutException();
return await task; // Unwrap result/re-throw
}
I have not tried it yet.
Upvotes: 1
Reputation: 1736
I came to the following piece of code that does what I needed:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Net.Http;
using System.Json;
using System.Threading;
namespace MyAsync
{
class Program
{
static void Main(string[] args)
{
var cts = new CancellationTokenSource();
Console.WriteLine("Start Main");
List<Task<List<MyObject>>> listoftasks = new List<Task<List<MyObject>>>();
listoftasks.Add(GetGoogle(cts));
listoftasks.Add(GetTwitter(cts));
listoftasks.Add(GetSleep(cts));
listoftasks.Add(GetxSleep(cts));
List<MyObject>[] arrayofanswers = Task.WhenAll(listoftasks).Result;
List<MyObject> answer = new List<MyObject>();
foreach (List<MyObject> answers in arrayofanswers)
{
answer.AddRange(answers);
}
foreach (MyObject o in answer)
{
Console.WriteLine("{0} - {1}", o.name, o.origin);
}
Console.WriteLine("Press <Enter>");
Console.ReadLine();
}
static async Task<List<MyObject>> GetGoogle(CancellationTokenSource cts)
{
try
{
Console.WriteLine("Start GetGoogle");
List<MyObject> l = new List<MyObject>();
var client = new HttpClient();
Task<HttpResponseMessage> awaitable = client.GetAsync("http://ajax.googleapis.com/ajax/services/search/web?v=1.0&q=broersa", cts.Token);
HttpResponseMessage res = await awaitable;
Console.WriteLine("After GetGoogle GetAsync");
dynamic data = JsonValue.Parse(res.Content.ReadAsStringAsync().Result);
Console.WriteLine("After GetGoogle ReadAsStringAsync");
foreach (var r in data.responseData.results)
{
l.Add(new MyObject() { name = r.titleNoFormatting, origin = "google" });
}
return l;
}
catch (TaskCanceledException)
{
return new List<MyObject>();
}
}
static async Task<List<MyObject>> GetTwitter(CancellationTokenSource cts)
{
try
{
Console.WriteLine("Start GetTwitter");
List<MyObject> l = new List<MyObject>();
var client = new HttpClient();
Task<HttpResponseMessage> awaitable = client.GetAsync("http://search.twitter.com/search.json?q=broersa&rpp=5&include_entities=true&result_type=mixed",cts.Token);
HttpResponseMessage res = await awaitable;
Console.WriteLine("After GetTwitter GetAsync");
dynamic data = JsonValue.Parse(res.Content.ReadAsStringAsync().Result);
Console.WriteLine("After GetTwitter ReadAsStringAsync");
foreach (var r in data.results)
{
l.Add(new MyObject() { name = r.text, origin = "twitter" });
}
return l;
}
catch (TaskCanceledException)
{
return new List<MyObject>();
}
}
static async Task<List<MyObject>> GetSleep(CancellationTokenSource cts)
{
try
{
Console.WriteLine("Start GetSleep");
List<MyObject> l = new List<MyObject>();
await Task.Delay(5000,cts.Token);
l.Add(new MyObject() { name = "Slept well", origin = "sleep" });
return l;
}
catch (TaskCanceledException)
{
return new List<MyObject>();
}
}
static async Task<List<MyObject>> GetxSleep(CancellationTokenSource cts)
{
Console.WriteLine("Start GetxSleep");
List<MyObject> l = new List<MyObject>();
await Task.Delay(2000);
cts.Cancel();
l.Add(new MyObject() { name = "Slept short", origin = "xsleep" });
return l;
}
}
}
My explanation is in my blogpost: http://blog.bekijkhet.com/2012/03/c-async-examples-whenall-whenany.html
Upvotes: 0
Reputation: 13589
Seems like the Task.WaitAll overload with the timeout parameter is all you need - if it returns true, then you know they all completed - otherwise, you can filter on IsCompleted.
if (Task.WaitAll(tasks, myTimeout) == false)
{
tasks = tasks.Where(t => t.IsCompleted);
}
...
Upvotes: 0