Reputation: 343
I'm trying to create an async ProducerConsumerCollection and for that, I'm using this msdn page (http://msdn.microsoft.com/en-us/library/hh873173.aspx (bottom of the page)).
I'm now trying to add a timeout, here is what I do :
public async Task<T> TakeWithTimeout(int timeout)
{
Task<T> takeTask = this.Take();
if (timeout <= 0 || takeTask == await Task.WhenAny(this.tasks.Take(), Task.Delay(timeout)))
{
return await takeTask;
}
else
{
// Timeout
return default(T);
}
}
}
The problem with this code is that, in case of timeout, it does not cancel the task created by the Take() method.
Since this task has been "created" by the TaskCompletionSource, I cannot give it a cancellationToken?
So, how to proceed to cancel it and properly implement this Take with timeout ?
Thanks :)
Upvotes: 4
Views: 10114
Reputation: 1688
A potentially slightly less complicated/more flexible way to do it would be to have the TaskCompletionSource
code do its thing, and then at the point at which you're ready to return the task, you can "attach" the cancellation to it using a similar technique to Bernie's answer.
Extension Methods:
public static class TaskExtensions
{
/// <summary>
/// Apply cancellation support to an arbitrary task
/// </summary>
public static async Task OrCancelledBy(this Task task, CancellationToken cancellationToken) =>
await Task.WhenAny(
task,
Task.Delay(-1, cancellationToken) // Wait forever until cancellation
);
/// <summary>
/// Apply cancellation support to an arbitrary task that returns TResult
/// </summary>
public static async Task<TResult> OrCancelledBy<TResult>(this Task<TResult> task, CancellationToken cancellationToken)
{
async Task<TResult> WaitUntilCancelled(CancellationToken ct)
{
await Task.Delay(-1, ct); // Wait forever until cancellation
return default!;
}
return await await Task.WhenAny(task, WaitUntilCancelled(cancellationToken));
}
}
Usage:
async Task TestAsync(CancellationToken ct)
{
var tcs = new TaskCompletionSource<bool>();
if (somethingOrTheOther) {
tcs.TrySetResult(true);
}
await tcs.Task.OrCancelledBy(ct);
}
Upvotes: 0
Reputation: 2598
I'm just going to post my solution to the question How to cancel a task from a TaskCompletionSource because that is what I needed myself.
I'm guessing this could be used for your specific need, but it's not tied to a specific timeout functionality, so this is a general solution (or so I hope).
This is an extension method:
public static async Task WaitAsync<T>(this TaskCompletionSource<T> tcs, CancellationToken ctok)
{
CancellationTokenSource cts = null;
CancellationTokenSource linkedCts = null;
try {
cts = new CancellationTokenSource();
linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, ctok);
var exitTok = linkedCts.Token;
Func<Task> listenForCancelTaskFnc = async () => {
await Task.Delay(-1, exitTok).ConfigureAwait(false);
};
var cancelTask = listenForCancelTaskFnc();
await Task.WhenAny(new Task[] { tcs.Task, cancelTask }).ConfigureAwait(false);
cts.Cancel();
} finally {
if(linkedCts != null) linkedCts.Dispose();
}
}
Usage:
async Task TestAsync(CancellationToken ctok) {
var tcs = new TaskCompletionSource<bool>();
if (somethingOrTheOther) {
tcs.TrySetResult(true);
}
await tcs.WaitAsync(ctok);
}
The idea is to have a supervisory async Task waiting essentially forever until it is cancelled, which we can use to 'exit early' in case the TaskCompletionSource
is not yet satisfied, but we need to exit anyhow due to a cancel request.
The supervisory Task is guaranteed to be cancelled at the end of WaitAsync
regardless how it falls out of the WhenAny
. Either the TaskCompletionSource
is satisfied with a result, and WhenAny
completes, briefly leaving the supervisory sleeper task in tact until the next line where cts.Cancel()
is called, or it was cancelled with the exitToken
, which is a combined token of the passed in ctok
or the internal one cts.Token
.
Anyhow, I hope this makes sense -- please let me know if this code has any problems...
Upvotes: 2
Reputation: 456407
Writing a cancel-safe async
-friendly producer/consumer collection is non-trivial. What you need to do is change Take
to accept a CancellationToken
as a parameter, and it should register a handler so that when it is cancelled the TaskCompletionSource
is cancelled.
I highly recommend you use BufferBlock<T>
, which has cancellation support built-in.
If you can't use TPL Dataflow (e.g., you're working in a PCL or have target platforms unsupported by Dataflow), then you can use the producer/consumer collections in my open-source AsyncEx library (such as AsyncProducerConsumerQueue
or AsyncCollection
). These are both based on AsyncLock
and AsyncConditionVariable
, a design I describe briefly on my blog (which does not get into the cancellation details). The key behind supporting cancellation in a producer/consumer collection with this design is to support cancellation in AsyncConditionVariable.WaitAsync
; once your condition variable type supports cancellation, then your collection will easily support it, too.
Upvotes: 8