Reputation:
I created the following method TestThrottled to try throttle my tasks, but it is not throttling at all, when I call WhenAll and this method both have equivalent elapsed time. Am I doing anything wrong?
private static async Task<T[]> TestThrottled<T>(List<Task<T>> tasks, int maxDegreeOfParallelism)
{
var semaphore = new SemaphoreSlim(maxDegreeOfParallelism);
var tasksParallelized = new List<Task<T>>();
foreach (var task in tasks)
{
var taskParallelized = Task.Run(async () =>
{
try
{
await semaphore.WaitAsync();
return await task;
}
finally
{
semaphore.Release();
}
});
tasksParallelized.Add(taskParallelized);
}
return await Task.WhenAll(tasksParallelized);
}
private static async Task<int> TestAsync()
{
await Task.Delay(1000);
return 1;
}
static async Task Main(string[] args)
{
var sw = Stopwatch.StartNew();
var tasks = new List<Task<int>>();
var ints = new List<int>();
for (int i = 0; i < 30; i++)
{
tasks.Add(TestAsync());
}
ints.AddRange(await TestThrottled(tasks, 1));
Console.WriteLine($"{sw.ElapsedMilliseconds}, count: {ints.Count}");
Console.ReadLine();
}
Upvotes: 3
Views: 1581
Reputation: 43683
The key for solving this problem is to let the throttler start the tasks, instead of starting them beforehand. And since starting tasks explicitly with the old Task.Start
method is very restrictive (predates and can't take advantage of the async-await mechanism), the only alternative is to let the throttler create the tasks. There are multiple ways to do this:
private static async Task<TResult[]> RunAsyncThrottled<TResult>(
IEnumerable<Func<Task<TResult>>> taskFactories,
int maxDegreeOfParallelism)
{
//...
foreach (var taskFactory in taskFactories)
//...
var task = taskFactory();
TResult result = await task;
}
private static async Task<TResult[]> RunAsyncThrottled<TSource, TResult>(
IEnumerable<TSource> items, Func<TSource, Task<TResult>> taskFactory,
int maxDegreeOfParallelism)
{
//...
foreach (var item in items)
//...
var task = taskFactory(item);
TResult result = await task;
}
yield
).private static async Task<TResult[]> RunAsyncThrottled<TResult>(
IEnumerable<Task<TResult>> tasks, int maxDegreeOfParallelism)
{
if (tasks is ICollection<Task<TResult>>) throw new ArgumentException(
"The enumerable should not be materialized.", nameof(tasks));
//...
foreach (var task in tasks)
//...
TResult result = await task;
}
Since C# 8 is now released, there is an alternative on the return value of the method. Instead of returning Task<TResult[]>
it can return
IAsyncEnumerable<TResult>
, that allows asynchronous enumeration with await foreach
.
private static async IAsyncEnumerable<TResult> RunAsyncThrottled<TSource, TResult>(
IEnumerable<TSource> items, Func<TSource, Task<TResult>> taskFactory,
int maxDegreeOfParallelism)
{
//...
foreach (var item in items)
//...
yield return await taskFactory(item);
}
Upvotes: 2
Reputation: 81513
Another way you can do this is with TPL DataFlow, it has everything you need already, and can cater for more complex Pipelining if needed, and is oodles more configurable. It also saves you offloading to another task as in your example solution
private static async Task<IList<T>> TestThrottled<T>(IEnumerable<Func<Task<T>>> tasks, int maxDegreeOfParallelism)
{
var options = new ExecutionDataflowBlockOptions() { EnsureOrdered = false, MaxDegreeOfParallelism = maxDegreeOfParallelism };
var transform = new TransformBlock<Func<Task<T>>, T>(func => func.Invoke(), options);
var outputBufferBlock = new BufferBlock<T>();
transform.LinkTo(outputBufferBlock, new DataflowLinkOptions(){PropagateCompletion = true});
foreach (var task in tasks)
transform.Post(task);
transform.Complete();
await outputBufferBlock. Completion;
outputBufferBlock.TryReceiveAll(out var result);
return result;
}
Upvotes: 4
Reputation:
I solved my problem (create a generic throttled task runner receiving a list of async methods), doing as follow:
private static async Task<T[]> RunAsyncThrottled<T>(IEnumerable<Func<Task<T>>> tasks, int maxDegreeOfParallelism)
{
var tasksParallelized = new List<Task<T>>();
using (var semaphore = new SemaphoreSlim(maxDegreeOfParallelism))
{
foreach (var task in tasks)
{
var taskParallelized = Task.Run(async () =>
{
await semaphore.WaitAsync();
try
{
return await task.Invoke();
}
finally
{
semaphore.Release();
}
});
tasksParallelized.Add(taskParallelized);
}
return await Task.WhenAll(tasksParallelized);
}
}
private static async Task<int> TestAsync(int num)
{
await Task.Delay(1000);
return 1 + num;
}
static async Task Main(string[] args)
{
var sw = Stopwatch.StartNew();
var tasks = new List<Func<Task<int>>>();
var ints = new List<int>();
for (int i = 0; i < 10; i++)
{
tasks.Add(() => TestAsync(12000));
}
ints.AddRange(await RunAsyncThrottled(tasks, 1000));
Console.WriteLine($"{sw.Elapsed.TotalMilliseconds}, count: {ints.Count}");
Console.ReadLine();
}
Upvotes: 4
Reputation: 32617
The main problem here is the behavior of async/await
. What happens when you call
private static async Task<int> TestAsync()
{
await Task.Delay(1000);
return 1;
}
TestAync();
TestAsync()
gets called. Within that method, Task.Delay()
gets called. This will create a task that finishes after 1000 ms. Finally, you return that task (actually, another task that is scheduled as the continuation of the task returned by Task.Delay()
).
You create all of these tasks at approximately the same time in your loop in Main()
. Therefore, although you might have a semaphore in place that prevents multiple threads from calling await task
at the same time, they all are scheduled to finish at about the same time anyway. await
only waits as long as the task is not finished yet. So, as soon as the first thread releases the semaphore (which is after about a second), the next thread can enter the critical region, where it will find that the task is already finished (or very close to finishing). It can then immediately release the semaphore. This happens for the remaining tasks as well and you get a total run time of about one second.
Upvotes: 3