Nisalon
Nisalon

Reputation: 343

How to cancel a task from a TaskCompletionSource?

I'm trying to create an async ProducerConsumerCollection and for that, I'm using this msdn page (http://msdn.microsoft.com/en-us/library/hh873173.aspx (bottom of the page)).

I'm now trying to add a timeout, here is what I do :

    public async Task<T> TakeWithTimeout(int timeout)
    {
            Task<T> takeTask = this.Take();

            if (timeout <= 0 || takeTask == await Task.WhenAny(this.tasks.Take(), Task.Delay(timeout)))
            {
                return await takeTask;
            }
            else
            {
                // Timeout
                return default(T);
            }
        }
    }

The problem with this code is that, in case of timeout, it does not cancel the task created by the Take() method.

Since this task has been "created" by the TaskCompletionSource, I cannot give it a cancellationToken?

So, how to proceed to cancel it and properly implement this Take with timeout ?

Thanks :)

Upvotes: 4

Views: 10114

Answers (3)

Lee Oades
Lee Oades

Reputation: 1688

A potentially slightly less complicated/more flexible way to do it would be to have the TaskCompletionSource code do its thing, and then at the point at which you're ready to return the task, you can "attach" the cancellation to it using a similar technique to Bernie's answer.

Extension Methods:

public static class TaskExtensions
{
    /// <summary>
    /// Apply cancellation support to an arbitrary task
    /// </summary>
    public static async Task OrCancelledBy(this Task task, CancellationToken cancellationToken) =>
        await Task.WhenAny(
            task, 
            Task.Delay(-1, cancellationToken) // Wait forever until cancellation
        );

    /// <summary>
    /// Apply cancellation support to an arbitrary task that returns TResult
    /// </summary>
    public static async Task<TResult> OrCancelledBy<TResult>(this Task<TResult> task, CancellationToken cancellationToken)
    {
        async Task<TResult> WaitUntilCancelled(CancellationToken ct)
        {
            await Task.Delay(-1, ct); // Wait forever until cancellation
            return default!;
        }

        return await await Task.WhenAny(task, WaitUntilCancelled(cancellationToken));
    }
}

Usage:

    async Task TestAsync(CancellationToken ct) 
    {
        var tcs = new TaskCompletionSource<bool>();

        if (somethingOrTheOther) {
            tcs.TrySetResult(true);
        }

        await tcs.Task.OrCancelledBy(ct);
    }

Upvotes: 0

Bernie Habermeier
Bernie Habermeier

Reputation: 2598

I'm just going to post my solution to the question How to cancel a task from a TaskCompletionSource because that is what I needed myself.

I'm guessing this could be used for your specific need, but it's not tied to a specific timeout functionality, so this is a general solution (or so I hope).

This is an extension method:

    public static async Task WaitAsync<T>(this TaskCompletionSource<T> tcs, CancellationToken ctok)
    {

        CancellationTokenSource cts = null;
        CancellationTokenSource linkedCts = null;

        try {

            cts = new CancellationTokenSource();
            linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, ctok);

            var exitTok = linkedCts.Token;

            Func<Task> listenForCancelTaskFnc = async () => {
                await Task.Delay(-1, exitTok).ConfigureAwait(false); 
            };

            var cancelTask = listenForCancelTaskFnc();

            await Task.WhenAny(new Task[] { tcs.Task, cancelTask }).ConfigureAwait(false);

            cts.Cancel();

        } finally {

            if(linkedCts != null) linkedCts.Dispose();

        }

    }

Usage:

    async Task TestAsync(CancellationToken ctok) {

        var tcs = new TaskCompletionSource<bool>();

        if (somethingOrTheOther) {
            tcs.TrySetResult(true);
        }

        await tcs.WaitAsync(ctok);

    }

The idea is to have a supervisory async Task waiting essentially forever until it is cancelled, which we can use to 'exit early' in case the TaskCompletionSource is not yet satisfied, but we need to exit anyhow due to a cancel request.

The supervisory Task is guaranteed to be cancelled at the end of WaitAsync regardless how it falls out of the WhenAny. Either the TaskCompletionSource is satisfied with a result, and WhenAny completes, briefly leaving the supervisory sleeper task in tact until the next line where cts.Cancel() is called, or it was cancelled with the exitToken, which is a combined token of the passed in ctok or the internal one cts.Token.

Anyhow, I hope this makes sense -- please let me know if this code has any problems...

Upvotes: 2

Stephen Cleary
Stephen Cleary

Reputation: 456407

Writing a cancel-safe async-friendly producer/consumer collection is non-trivial. What you need to do is change Take to accept a CancellationToken as a parameter, and it should register a handler so that when it is cancelled the TaskCompletionSource is cancelled.

I highly recommend you use BufferBlock<T>, which has cancellation support built-in.

If you can't use TPL Dataflow (e.g., you're working in a PCL or have target platforms unsupported by Dataflow), then you can use the producer/consumer collections in my open-source AsyncEx library (such as AsyncProducerConsumerQueue or AsyncCollection). These are both based on AsyncLock and AsyncConditionVariable, a design I describe briefly on my blog (which does not get into the cancellation details). The key behind supporting cancellation in a producer/consumer collection with this design is to support cancellation in AsyncConditionVariable.WaitAsync; once your condition variable type supports cancellation, then your collection will easily support it, too.

Upvotes: 8

Related Questions