tbddeveloper
tbddeveloper

Reputation: 2447

Multiple Long Running Tasks in Parallel

Trying to create a service to consume many topics from Kafka. I want to create a Task for each consumer, which means these are long running tasks.

In each task, if I do;

while(!token.IsCancellationRequested)
{
 var result = await consumer.Consume(token);
}

Then if I try and start multiple tasks, the first one starts, but others don't continue.

I thought I was looking for best practices on how to correctly manage tasks (it's not something I've done a lot of) and posted on Software Engineering, but I was told there's a bug and I should post on here.

To see this work;

async Task Main()
{
    var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
    
    await Task.WhenAll(Consumer1(cancellationTokenSource.Token), Consumer2(cancellationTokenSource.Token));
    
    Console.WriteLine("Complete");
}

private async Task Consumer1(CancellationToken token)
{
    Console.WriteLine("Working on consumer 1");
    
    while (!token.IsCancellationRequested)
    {
        // Do a thing
        // await consumer.Consume() 
    }
    
    Console.WriteLine("Completed Consumer 1");

    await Task.CompletedTask;
}

private async Task Consumer2(CancellationToken token)
{
    Console.WriteLine("Completed consumer 2");
    
    await Task.CompletedTask;
}

The result will be;

00:00:00: Working on consumer 1 
00:00:05: Completed Consumer 1 
00:00:05: Completed consumer 2
00:00:05: Complete

If you put a await Task.Delay(1) in Consumer1, above the while, you get;

00:00:00: Working on consumer 1
00:00:00: Completed consumer 2
00:00:05: Completed Consumer 1
00:00:05: Complete

The actual code does contain await consumer.Consume(token) but that's not enough to run the tasks in parallel and the behavior tells me I'm missing the correct usage somewhere. So, my first question was, am I using Tasks completely incorrectly here, i.e is there a best practice for how to achieve what I'm trying to do. And secondly, what is the bug I am missing in the code?

For reference per @Jakoss comment, this will run the tasks as expected;

var tasks = new List<Task>();

tasks.Add(Task.Run(async () => await Consumer1(cancellationTokenSource.Token)));
tasks.Add(Task.Run(async () => await Consumer2(cancellationTokenSource.Token)));

await Task.WhenAll(tasks.ToArray());

Is this the correct way to do this?

Upvotes: 1

Views: 1366

Answers (2)

Stephen Cleary
Stephen Cleary

Reputation: 457177

Only if I put the Task.Delay(1) in does it even start the second task. I want both consumers to be running, and the only way I get that behavior is with the Task.Delay which makes me feel I'm doing something poorly or misunderstanding how this all works.

All methods (including async methods) begin executing synchronously. Also, if an await is used on an awaitable that is already complete, it will continue executing synchronously. In the example code, since the await consumer.Consume() is commented out, the entire method runs synchronously.

If you are sure that consumer.Consume will behave asynchronously, then all you have to do is uncomment it. If, however, it may not behave asynchronously (e.g., if a message has already arrived to consume), then you will probably want to use Task.Run to start the method on a background thread.

Upvotes: 5

Johnathan Barclay
Johnathan Barclay

Reputation: 20373

For Consumer2 to run before Consumer1 completes, Consumer1 would have to relinquish the current thread at some point (and schedule a continuation for the rest of the method).

This is usually when I/O takes place, or some other event driven process. Maybe this would happen when you call await consumer.Consume()? It certainly does when you await Task.Delay(1), and is why your second example produces the output stated in the question.

The alternative would be to explicitly push the execution of Consumer1 onto it's own thread, which can be done using Task.Run:

await Task.WhenAll(
    Task.Run(() => Consumer1(cancellationTokenSource.Token)), 
    Consumer2(cancellationTokenSource.Token));

However, this would now use an extra thread effectively "faking" async behaviour.

Upvotes: 1

Related Questions