Reputation: 12091
I have an IEnumerable<Task<T>>
where T
represents some event (the natural language type of event, not the event
type of event).
I want to process these asynchronously because they are IO-bound, and limit the amount of concurrency, because the database handling the events can't handle more than a handfull (say 6) concurrent processing requests (they are quite heavy) What is the right strategy of doing this?
If I have
private Task processeventasync(T someevent) {
...
}
foreach(t in tasks) {
await processeventsasync(await t)
}
I have no concurrency.
If I guard things with a semaphore, I'm actually guarding threads and protecting them with locks rather than awaiting them asynchronously.
The LimitedConcurrencyLevelTaskScheduler
from the example on https://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler(v=vs.110).aspx is also a thread/lock based approach
I've considered mainting a queue of at most 6 tasks, and make an WhenAny
loop around that, but it feels like re-inventing the square wheel.
private List<Task> running = new List<Task>();
foreach(Task<T> task in tasks) {
var inner = TaskExtensions.Unwrap(t.ContinueWith(tt => processeventasync(tt.Result)));
running.Add(inner);
if (running.Count >= 6) {
var resulttask = await Task.WhenAny(running);
running.Remove(resulttask);
await resulttask;
//not sure if this await will schedule the next iteration
//of the loop asynchronously, or if the loop happily continues
//and the continuation has the rest of the loop body (nothing
}
}
What's the right way to go here?
EDIT:
SemaphoreSlim
s WaitAsync
seems very reasonable for this. I'm coming to the following strange looking code:
private async void Foo()
{
IEnumerable<Task<int>> tasks = gettasks();
var resulttasks = tasks.Select(ti => TaskExtensions.Unwrap(ti.ContinueWith(tt => processeventasync(tt.Result))));
var semaphore = new SemaphoreSlim(initialCount: 6);
foreach (Task task in resulttasks)
{
await semaphore.WaitAsync();
semaphore.Release();
}
}
Having async void
here is rather smelly here, but it's an infinite loop; it will never return (actual processing would obviously have some cancellation mechanism).
It looks really strange with just the await/release in the body, but it looks like that's actually right. Is this a reasonable approach without hidden gotchas?
Upvotes: 5
Views: 929
Reputation: 149518
You can limit concurrency using SemaphoreSlim.WaitAsync
.
It looks really strange with just the await/release in the body, but it looks like that's actually right
Your current approach doesn't really do anything. The tasks aren't effected by the SemaphoreSlim
at all, since you concurrently invoke them using Enumerable.Select
.
You'll need to monitor the semaphore inside the Select
:
private const int ConcurrencyLimit = 6;
SemaphoreSlim semaphoreSlim = new SemaphoreSlim(ConcurrencyLimit);
public async Task FooAsync()
{
var tasks = GetTasks();
var sentTasks = tasks.Select(async task =>
{
await semaphoreSlim.WaitAsync();
try
{
await ProcessEventAsync(await task);
}
finally
{
semaphoreSlim.Release();
}
});
await Task.WhenAll(sentTasks);
}
private Task ProcessEventAsync(T someEvent)
{
// Process event.
}
Upvotes: 3
Reputation: 68640
You can use TPL Dataflow's ActionBlock<T>
.
Define an action block that processes your events, and then post items to be processed to this block. You can also set the maximum degree of parallelism.
var block = new ActionBlock<string>(str =>
{
//save in db
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 6
});
var sendings = new List<Task<bool>>
{
block.SendAsync("a"),
block.SendAsync("b"),
block.SendAsync("c")
};
await Task.WhenAll(sendings);
block.Complete(); // tell the block we're done sending messages
await block.Completion; // wait for messages to be processed
Upvotes: 1