Tiago Dall'Oca
Tiago Dall'Oca

Reputation: 349

How to chain async tasks with a CancellationToken?

I'd like to chain some tasks but conditionally continue with the execution if the CancellationToken I have wasn't fired.

What I'm aiming to achieve is something equivalent to

var cts = new CancellationTokenSource();
var cancellationToken = cts.Token;
var t = Task.Run(async () => {
    if (cancellationToken.IsCancellationRequested) return;
    await t1();
    if (cancellationToken.IsCancellationRequested) return;
    await t2();
    if (cancellationToken.IsCancellationRequested) return;
    await t3();
    if (cancellationToken.IsCancellationRequested) return;
    await t4();
});
var timeout = Task.Delay(TimeSpan.FromSeconds(4));
var completedTask = await Task.WhenAny(t, timeout);
if (completedTask != t)
{
    cts.Cancel();
    await t;
}

That's what I have by now and it is working, though it is also verbose.

Upvotes: 3

Views: 1510

Answers (6)

Miral
Miral

Reputation: 13020

Your original code is correct-ish -- it assumes that you always want the individual tasks to run to completion and that if cancelled you want the overall task to complete with success. Neither of these things are idiomatic.

A more normal way to do it would be something like:

var cts = new CancellationTokenSource();
var cancellationToken = cts.Token;
var t = Task.Run(async () => {
    cancellationToken.ThrowIfCancellationRequested();
    await t1(cancellationToken);
    cancellationToken.ThrowIfCancellationRequested();
    await t2(cancellationToken);
    cancellationToken.ThrowIfCancellationRequested();
    await t3(cancellationToken);
    cancellationToken.ThrowIfCancellationRequested();
    await t4(cancellationToken);
}, cancellationToken);

Then, somewhere else:

cts.Cancel();

You can omit the calls to ThrowIfCancellationRequested here assuming that the individual tasks check it fairly soon after entry, but the core idea is that you should be passing the token down to the innermost loop of whatever is doing the work, and they should throw on cancellation by calling that, which ends up setting the task to a cancelled state rather than a success state.

(So really you only need to actually call ThrowIfCancellationRequested if you hit a function that doesn't accept a CancellationToken parameter -- which is why all async methods should do so, as otherwise its task will be non-cancellable.)

Upvotes: 1

Tiago Dall'Oca
Tiago Dall'Oca

Reputation: 349

Since no one here gave a better answer I'll do it myself.

The code I provided in the question is, in my opinion, most cohesive and also general enough to be recycled in other cases.

Upvotes: -1

Enigmativity
Enigmativity

Reputation: 117019

You should consider using Microsoft's Reactive Framework (aka Rx) - NuGet System.Reactive and add using System.Reactive.Linq; - then you can do this:

IObservable<Unit> tasks =
    from x1 in Observable.FromAsync(() => t1())
    from x2 in Observable.FromAsync(() => t2())
    from x3 in Observable.FromAsync(() => t3())
    from x4 in Observable.FromAsync(() => t4())
    select Unit.Default;

IObservable<Unit> timer =
    Observable
        .Timer(TimeSpan.FromSeconds(4.0))
        .Select(x => Unit.Default)

IDisposable subscription =
    Observable
        .Amb(tasks, timer)
        .Subscribe();

If the timer observable fires before the tasks are complete then the entire pipeline is canceled. No unnecessary tasks are run.

If you want to cancel manually then just call subscription.Dispose().

The code is simple and beautiful too.

Upvotes: 0

Theodor Zoulias
Theodor Zoulias

Reputation: 43400

Your code is functionally OK, but when reading it at a glance it's not clear what it's doing. So I suggest that you encapsulate this logic inside a utility method with descriptive name and parameters:

public static async Task RunSequentially(IEnumerable<Func<Task>> taskFactories,
    int timeout = Timeout.Infinite, bool onTimeoutAwaitIncompleteTask = false)
{
    using (var cts = new CancellationTokenSource(timeout))
    {
        if (onTimeoutAwaitIncompleteTask)
        {
            await Task.Run(async () =>
            {
                foreach (var taskFactory in taskFactories)
                {
                    if (cts.IsCancellationRequested) throw new TimeoutException();
                    await taskFactory();
                }
            });
        }
        else // On timeout return immediately
        {
            var allSequentially = Task.Run(async () =>
            {
                foreach (var taskFactory in taskFactories)
                {
                    cts.Token.ThrowIfCancellationRequested();
                    var task = taskFactory(); // Synchronous part of task
                    cts.Token.ThrowIfCancellationRequested();
                    await task; // Asynchronous part of task
                }
            }, cts.Token);
            var timeoutTask = new Task(() => {}, cts.Token);
            var completedTask = await Task.WhenAny(allSequentially, timeoutTask);
            if (completedTask.IsCanceled) throw new TimeoutException();
            await completedTask; // Propagate any exception
        }
    }
}

This code defers from yours in that it throws a TimeoutException on time-out. I think that it's better to force the caller to handle explicitly this exception, instead of hiding the fact that the operation timed-out. The caller can ignore the exception by leaving empty the catch block:

try
{
    await RunSequentially(new[] { t1, t2, t3, t4 },
        timeout: 4000,
        onTimeoutAwaitIncompleteTask: true);
}
catch (TimeoutException)
{
    // Do nothing
}

Upvotes: 0

Gabriel Luci
Gabriel Luci

Reputation: 40858

It seems like your goal is to just stop execution if the whole operation has taken longer than 4 seconds.

If you were passing the CancellationToken to your t1/t2/etc. methods, I'd say you couldn't do any better than what you have. But as you have it, you could just use a Stopwatch instead of a CancellationToken:

var timeout = TimeSpan.FromSeconds(4);
var stopwatch = new Stopwatch();
stopwatch.Start();

await t1();
if (stopwatch.Elapsed > timeout) return;
await t2();
if (stopwatch.Elapsed > timeout) return;
await t3();
if (stopwatch.Elapsed > timeout) return;
await t4();
stopwatch.Stop();

I assume this is in a method somewhere, where you can use return, but you can modify if needed (return a value, throw an exception, etc.).

Upvotes: 0

Claudiu Guiman
Claudiu Guiman

Reputation: 867

var cts = new CancellationTokenSource();
var t = Task.Run(async () => {
     await t1();
     await t2();
     await t3();
     await t4();
 }, cts.Token);

cts.CancelAfter(TimeSpan.FromSeconds(4));

try
{
     await t;
}
catch (OperationCanceledException)
{
     // The cancellation token was triggered, add logic for that
     ....
}

Upvotes: 1

Related Questions