Reputation: 974
I need to run many tasks in parallel as fast as possible. But if my program runs more than 30 tasks per 1 second, it will be blocked. How to ensure that tasks run no more than 30 per any 1-second interval?
In other words, we must prevent the new task from starting if 30 tasks were completed in the last 1-second interval.
My ugly possible solution:
private async Task Process(List<Task> taskList, int maxIntervalCount, int timeIntervalSeconds)
{
var timeList = new List<DateTime>();
var sem = new Semaphore(maxIntervalCount, maxIntervalCount);
var tasksToRun = taskList.Select(async task =>
{
do
{
sem.WaitOne();
}
while (HasAllowance(timeList, maxIntervalCount, timeIntervalSeconds));
await task;
timeList.Add(DateTime.Now);
sem.Release();
});
await Task.WhenAll(tasksToRun);
}
private bool HasAllowance(List<DateTime> timeList, int maxIntervalCount, int timeIntervalSeconds)
{
return timeList.Count <= maxIntervalCount
|| DateTime.Now.Subtract(TimeSpan.FromSeconds(timeIntervalSeconds)) > timeList[timeList.Count - maxIntervalCount];
}
Upvotes: 0
Views: 2007
Reputation: 43495
This problem can be solved by a SemaphoreSlim
limited to the number of maximum tasks per interval, that is released with a Timer
when the interval has elapsed after launching a task. The same idea has been used for answering a similar question (the RateLimiter
class). Below is an implementation based on this idea, with a signature and behavior that mimics the .NET 6 Parallel.ForEachAsync
API:
/// <summary>
/// Projects each element of the source sequence into a new form using an
/// asynchronous delegate, enforcing a limit on the number of concurrent
/// asynchronous operations that can start during a specified time span.
/// </summary>
public static Task<TResult[]> Parallel_ForEachAsync<TSource, TResult>(
IEnumerable<TSource> source,
Func<TSource, CancellationToken, Task<TResult>> body,
int maxActionsPerTimeUnit,
TimeSpan timeUnit,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(body);
if (maxActionsPerTimeUnit < 1)
throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > Int32.MaxValue)
throw new ArgumentOutOfRangeException(nameof(timeUnit));
SemaphoreSlim semaphore = new(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
CancellationTokenSource cts = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
List<System.Threading.Timer> timers = new();
async Task<Task<TResult[]>> EnumerateSourceAsync()
{
// This method always completes successfully.
// Any errors are wrapped in the tasks.
List<Task<TResult>> tasks = new();
try
{
foreach (TSource item in source)
{
try
{
await semaphore.WaitAsync(cts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
if (cancellationToken.IsCancellationRequested)
tasks.Add(Task.FromCanceled<TResult>(cancellationToken));
// Otherwise the cancellation was caused by a task failure.
break;
}
// Launch the task
Task<TResult> task = body(item, cts.Token).ContinueWith(t =>
{
if (!t.IsCompletedSuccessfully) cts.Cancel();
// In case of cancellation propagate the correct token.
if (t.IsCanceled && cancellationToken.IsCancellationRequested)
return Task.FromCanceled<TResult>(cancellationToken);
return t;
}, default, TaskContinuationOptions.DenyChildAttach |
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default).Unwrap();
tasks.Add(task);
// Schedule the release of the semaphore using a Timer.
System.Threading.Timer timer = new(_ => semaphore.Release());
timer.Change(timeUnit, Timeout.InfiniteTimeSpan);
timers.Add(timer);
}
}
catch (Exception ex) { tasks.Add(Task.FromException<TResult>(ex)); }
return Task.WhenAll(tasks);
}
return EnumerateSourceAsync().ContinueWith(t =>
{
// Clean up
Task.WaitAll(timers.Select(t => t.DisposeAsync().AsTask()).ToArray());
cts.Dispose();
semaphore.Dispose();
return t;
}, default, TaskContinuationOptions.DenyChildAttach |
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default).Unwrap().Unwrap();
}
Usage example:
int[] results = await Parallel_ForEachAsync(Enumerable.Range(1, 100), async n =>
{
await Task.Delay(500); // Simulate some asynchronous I/O-bound operation
return n;
}, maxActionsPerTimeUnit: 30, timeUnit: TimeSpan.FromSeconds(1.0);
The Parallel_ForEachAsync
method propagates asynchronously all the errors that might be thrown by the body
delegate, in exactly the same way as the built-in Parallel.ForEachAsync
API.
For an alternative implementation that compiles on .NET platforms older than .NET 6, and also features different options (includeAsynchronousDuration
, onErrorContinue
and executeOnCapturedContext
, but lacks cancellationToken
), see the 8th revision of this answer.
Upvotes: 1
Reputation: 131364
User code should never have to control how tasks are scheduled directly. For one thing, it can't - controlling how tasks run is the job of the TaskScheduler. When user code calls .Start()
, it simply adds a task to a threadpool queue for execution. await
executes already executing tasks.
The TaskScheduler samples show how to create limited concurrency schedulers, but again, there are better, high-level options.
The question's code doesn't throttle the queued tasks anyway, it limits how many of them can be awaited. They are all running already. This is similar to batching the previous asynchronous operation in a pipeline, allowing only a limited number of messages to pass to the next level.
ActionBlock with delay
The easy, out-of-the-box way would be to use an ActionBlock with a limited MaxDegreeOfParallelism, to ensure no more than N concurrent operations can run at the same time. If we know how long each operation takes, we could add a bit of delay to ensure we don't overshoot the throttle limit.
In this case, 7 concurrent workers perform 4 requests/second, for a total of 28 maximum request per second. The BoundedCapacity
means that only up to 7 items will be stored in the input buffer before downloader.SendAsync
blocks. This way we avoid flooding the ActionBlock
if the operations take too long.
var downloader = new ActionBlock<string>(
async url => {
await Task.Delay(250);
var response=await httpClient.GetStringAsync(url);
//Do something with it.
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 7, BoundedCapacity=7 }
);
//Start posting to the downloader
foreach(var item in urls)
{
await downloader.SendAsync(item);
}
downloader.Complete();
await downloader.Completion;
ActionBlock with SemaphoreSlim
Another option would be to combine this with a SemaphoreSlim
that gets reset periodically by a timer.
var refreshTimer = new Timer(_=>sm.Release(30));
var downloader = new ActionBlock<string>(
async url => {
await semaphore.WaitAsync();
try
{
var response=await httpClient.GetStringAsync(url);
//Do something with it.
}
finally
{
semaphore.Release();
}
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, BoundedCapacity=5 }
);
//Start the timer right before we start posting
refreshTimer.Change(1000,1000);
foreach(....)
{
}
Upvotes: 1
Reputation: 529
Is the blocking due to some server/firewall/hardware limit or it is based on observation?
You should try to use BlockingCollection<Task>
or similar thread safe collections especially if the job of your tasks are I/O-bound. You can even set the capacity to 30:
var collection = BlockingCollection<Task>(30);
Then you can start 2 async method:
var population = Task.Factory.Start(Populate);
var processing = Task.Factory.Start(Dequeue);
await Task.WhenAll(population, processing);
Task Populate()
{
foreach (...)
collection.Add(...);
collection.CompleteAdding();
}
Task Dequeue
{
while(!collection.IsComplete)
await collection.Take(); //consider using TryTake()
}
If the limit presists due to some true limitation (should be very rare) change Populate() as follows:
var stopper = Stopwatch.StartNew();
for (var i = ....) //instead of foreach
{
if (i % 30 == 0)
{
if (stopper.ElapsedMilliseconds < 1000)
Task.Delay(1000 - stopper.ElapsedMilliseconds); //note that this race condition should be avoided in your code
stopper.Restart();
}
collection.Add(...);
}
collection.CompleteAdding();
Upvotes: 0
Reputation: 10929
This is the snippet:
var tasks = new List<Task>();
foreach(item in listNeedInsert)
{
var task = TaskToRun(item);
tasks.Add(task);
if(tasks.Count == 100)
{
await Task.WhenAll(tasks);
tasks.Clear();
}
}
// Wait for anything left to finish
await Task.WhenAll(tasks);
Notice that I rather add the task into a List<Task>();
and after all is added, I await all in the same List<Task>();
What you do here:
var tasks = taskList.Select(async task =>
{
do
{
sem.WaitOne();
}
while (timeList.Count <= maxIntervalCount
|| DateTime.Now.Subtract(TimeSpan.FromSeconds(timeIntervalSeconds)) > timeList[timeList.Count - maxIntervalCount]);
await task;
is blocking until the task finishes it's work thus making this call:
Task.WhenAll(tasks).Wait();
completely redundant. Furthermore, this line Task.WhenAll(tasks).Wait();
is performing unnecessary blocking on the WhenAll
method.
Upvotes: 0