Reputation: 2447
Trying to create a service to consume many topics from Kafka. I want to create a Task for each consumer, which means these are long running tasks.
In each task, if I do;
while(!token.IsCancellationRequested)
{
var result = await consumer.Consume(token);
}
Then if I try and start multiple tasks, the first one starts, but others don't continue.
I thought I was looking for best practices on how to correctly manage tasks (it's not something I've done a lot of) and posted on Software Engineering, but I was told there's a bug and I should post on here.
To see this work;
async Task Main()
{
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await Task.WhenAll(Consumer1(cancellationTokenSource.Token), Consumer2(cancellationTokenSource.Token));
Console.WriteLine("Complete");
}
private async Task Consumer1(CancellationToken token)
{
Console.WriteLine("Working on consumer 1");
while (!token.IsCancellationRequested)
{
// Do a thing
// await consumer.Consume()
}
Console.WriteLine("Completed Consumer 1");
await Task.CompletedTask;
}
private async Task Consumer2(CancellationToken token)
{
Console.WriteLine("Completed consumer 2");
await Task.CompletedTask;
}
The result will be;
00:00:00: Working on consumer 1
00:00:05: Completed Consumer 1
00:00:05: Completed consumer 2
00:00:05: Complete
If you put a await Task.Delay(1) in Consumer1, above the while, you get;
00:00:00: Working on consumer 1
00:00:00: Completed consumer 2
00:00:05: Completed Consumer 1
00:00:05: Complete
The actual code does contain await consumer.Consume(token)
but that's not enough to run the tasks in parallel and the behavior tells me I'm missing the correct usage somewhere. So, my first question was, am I using Tasks completely incorrectly here, i.e is there a best practice for how to achieve what I'm trying to do. And secondly, what is the bug I am missing in the code?
For reference per @Jakoss comment, this will run the tasks as expected;
var tasks = new List<Task>();
tasks.Add(Task.Run(async () => await Consumer1(cancellationTokenSource.Token)));
tasks.Add(Task.Run(async () => await Consumer2(cancellationTokenSource.Token)));
await Task.WhenAll(tasks.ToArray());
Is this the correct way to do this?
Upvotes: 1
Views: 1366
Reputation: 457177
Only if I put the Task.Delay(1) in does it even start the second task. I want both consumers to be running, and the only way I get that behavior is with the Task.Delay which makes me feel I'm doing something poorly or misunderstanding how this all works.
All methods (including async
methods) begin executing synchronously. Also, if an await
is used on an awaitable that is already complete, it will continue executing synchronously. In the example code, since the await consumer.Consume()
is commented out, the entire method runs synchronously.
If you are sure that consumer.Consume
will behave asynchronously, then all you have to do is uncomment it. If, however, it may not behave asynchronously (e.g., if a message has already arrived to consume), then you will probably want to use Task.Run
to start the method on a background thread.
Upvotes: 5
Reputation: 20373
For Consumer2
to run before Consumer1
completes, Consumer1
would have to relinquish the current thread at some point (and schedule a continuation for the rest of the method).
This is usually when I/O takes place, or some other event driven process. Maybe this would happen when you call await consumer.Consume()
? It certainly does when you await Task.Delay(1)
, and is why your second example produces the output stated in the question.
The alternative would be to explicitly push the execution of Consumer1
onto it's own thread, which can be done using Task.Run
:
await Task.WhenAll(
Task.Run(() => Consumer1(cancellationTokenSource.Token)),
Consumer2(cancellationTokenSource.Token));
However, this would now use an extra thread effectively "faking" async
behaviour.
Upvotes: 1