MaYaN
MaYaN

Reputation: 6996

Why is the Exception not being handled in this Asynchronous Retry Wrapper using TPL?

I am on .Net 4 and have the following Async "Retry" wrapper:

public static Task<T> Retry<T, TException>(Func<T> work, Action<TException> onException, TimeSpan retryInterval, int maxExecutionCount = 3) where TException : Exception
{
    for (var i = 0; i < maxExecutionCount; i++)
    {
        try
        {
            return Task.Factory.StartNew(work);
        } 
        catch (AggregateException ae)
        {
            ae.Handle(e =>
            {
                if (e is TException)
                {
                    // allow program to continue in this case
                    // do necessary logging or whatever
                    if (onException != null) { onException((TException)e); }

                    Thread.Sleep(retryInterval);
                    return true;
                }
                throw new RetryWrapperException("Unexpected exception occurred", ae);
            });
        }
    }
    var msg = "Retry unsuccessful after: {0} attempt(s)".FormatWith(maxExecutionCount);
    throw new RetryWrapperException(msg);
}

Which I am trying to test using:

[TestFixture]
public class RetryWrapperTest
{
    private static int _counter;

    private Func<string> _work;
    private Action<InvalidOperationException> _onException;
    private TimeSpan _retryInterval;
    private int _maxRetryCount;

    [TestFixtureSetUp]
    public void SetUp()
    {
        _counter = 0;

        _work =  GetSampleResult;
        _onException = e => Console.WriteLine("Caught the exception: {0}", e);
        _retryInterval = TimeSpan.FromSeconds(5);
        _maxRetryCount = 4;
    }

    [Test]
    public void Run()
    {
        var resultTask = RetryWrapper.Retry(_work, _onException, _retryInterval, _maxRetryCount);
        Console.WriteLine("This wrapper doesn't block!");

        // Why is this line throwing? I expect the exception to be handled
        // by the wrapper
        var result = resultTask.Result;
        result.Should().NotBeNullOrWhiteSpace();
        result.Should().Be("Sample result!");
    }

    private static string GetSampleResult()
    {
        if (_counter < 3)
        {
            _counter++;
            throw new InvalidOperationException("Baaah!");
        }
        return "Sample result!";
    }
}

However the AggregateException is thrown instead of being caught. The whole point of using this wrapper is not having to put try-catch around the var result = resultTask.Result; what am I doing wrong?

Please note I cannot use async-await or the Bcl.Async as I am on .Net 4, VS 2010

Upvotes: 3

Views: 264

Answers (3)

Sam Harwell
Sam Harwell

Reputation: 99949

What you want to do is not easy. In fact, it's so challenging to get "just right" that I wrote a library specifically to solve this type of problem (the Rackspace Threading Library, Apache 2.0 License).

Desired Behavior

The desired behavior for your code is most easily described using async/await, even though the final version of the code will not use this functionality. I made a few changes to your Retry method for this.

  1. To improve the ability of the method to support truly asynchronous operations, I changed the work parameter from a Func<T> to a Func<Task<T>>.
  2. To avoid blocking a thread unnecessarily (even if it's a background thread), I used Task.Delay instead of Thread.Sleep.
  3. I made the assumption that onException is never null. While this assumption is not easy to omit for code using async/await, it could if necessary be addressed for the implementation below.
  4. I made the assumption if the task returned by work enters the Faulted state, it will have an InnerExceptions property that only contains 1 exception. If it contains more than one exception, all except the first will be ignored. While this assumption is not easy to omit for code using async/await, it could if necessary be addressed for the implementation below.

Here is the resulting implementation. Note that I could have used a for loop instead of a while loop, but as you will see that would complicate the step that follows.

public async Task<T> Retry<T, TException>(Func<Task<T>> work, Action<TException> onException, TimeSpan retryInterval, int maxExecutionCount)
    where TException : Exception
{
    int count = 0;
    while (count < maxExecutionCount)
    {
        if (count > 0)
            await Task.Delay(retryInterval);

        count++;

        try
        {
            return await work();
        }
        catch (TException ex)
        {
            onException(ex);
        }
        catch (Exception ex)
        {
            throw new RetryWrapperException("Unexpected exception occurred", ex);
        }
    }

    string message = string.Format("Retry unsuccessful after: {0} attempt(s)", maxExecutionCount);
    throw new RetryWrapperException(message);
}

Porting to .NET 4.0

Converting this code to .NET 4.0 (and even .NET 3.5) uses the following features of the Rackspace Threading Library:

  • TaskBlocks.While: To convert the while loop.
  • CoreTaskExtensions.Select: For performing synchronous operations after an antecedent task completes successfully.
  • CoreTaskExtensions.Then: For performing asynchronous operations after an antecedent task completes successfully.
  • CoreTaskExtensions.Catch (new for V1.1): For exception handling.
  • DelayedTask.Delay (new for V1.1): For the behavior of Task.Delay.

There are a few behavioral differences between this implementation and the one above. In particular:

  • If work returns null, the Task returned by this method will transition to the canceled state (as shown by this test), where the method above will transition to the faulted state due to a NullReferenceException.
  • This implementation behaves as though ConfigureAwait(false) were called before every await in the previous implementation. In my mind at least, this is actually not a bad thing.
  • If the onException method throws an exception, this implementation will wrap that exception in a RetryWrapperException. In other words, this implementation actually models this code, rather than the block written in the Desired Behavior section:

    try
    {
        try
        {
            return await work();
        }
        catch (TException ex)
        {
            onException(ex);
        }
    }
    catch (Exception ex)
    {
        throw new RetryWrapperException("Unexpected exception occurred", ex);
    }
    

Here is the resulting implementation:

public static Task<T> Retry<T, TException>(Func<Task<T>> work, Action<TException> onException, TimeSpan retryInterval, int maxExecutionCount)
    where TException : Exception
{
    int count = 0;
    bool haveResult = false;
    T result = default(T);

    Func<bool> condition = () => count < maxExecutionCount;
    Func<Task> body =
        () =>
        {
            Task t1 = count > 0 ? DelayedTask.Delay(retryInterval) : CompletedTask.Default;

            Task t2 =
                t1.Then(
                    _ =>
                    {
                        count++;
                        return work();
                    })
                .Select(
                    task =>
                    {
                        result = task.Result;
                        haveResult = true;
                    });

            Task t3 =
                t2.Catch<TException>(
                    (_, ex) =>
                    {
                        onException(ex);
                    })
                .Catch<Exception>((_, ex) =>
                    {
                        throw new RetryWrapperException("Unexpected exception occurred", ex);
                    });

            return t3;
        };

    Func<Task, T> selector =
        _ =>
        {
            if (haveResult)
                return result;

            string message = string.Format("Retry unsuccessful after: {0} attempt(s)", maxExecutionCount);
            throw new RetryWrapperException(message);
        };

    return
        TaskBlocks.While(condition, body)
        .Select(selector);
}

Sample Test

The following test method demonstrates the code above functions as described.

[TestMethod]
public void Run()
{
    Func<Task<string>> work = GetSampleResultAsync;
    Action<InvalidOperationException> onException = e => Console.WriteLine("Caught the exception: {0}", e);
    TimeSpan retryInterval = TimeSpan.FromSeconds(5);
    int maxRetryCount = 4;

    Task<string> resultTask = Retry(work, onException, retryInterval, maxRetryCount);
    Console.WriteLine("This wrapper doesn't block");

    var result = resultTask.Result;
    Assert.IsFalse(string.IsNullOrWhiteSpace(result));
    Assert.AreEqual("Sample result!", result);
}

private static int _counter;

private static Task<string> GetSampleResultAsync()
{
    if (_counter < 3)
    {
        _counter++;
        throw new InvalidOperationException("Baaah!");
    }

    return CompletedTask.FromResult("Sample result!");
}

Future Considerations

If you really want to have a rock-solid implementation, I recommend you further modify your code in the following ways.

  1. Support cancellation.

    1. Add a CancellationToken cancellationToken parameter as the last parameter of the Retry method.
    2. Change the type of work to Func<CancellationToken, Task<T>>.
    3. Pass the cancellationToken argument to work, and to the call to DelayedTask.Delay.
  2. Support back-off policies. You could remove the retryInterval and maxExecutionCount parameters and use an IEnumerable<TimeSpan> instead, or you could incorporate and interface like IBackoffPolicy along with a default implementation like BackoffPolicy (both MIT licensed).

Upvotes: 4

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149578

There are a couple of things wrong with your code:

  1. As mentioned before, you don't await on the executed Task, there is no chance for it to complete and catch the exception, as the returned Task propogates to the caller immediatly. If await isn't avaliable, you'll have to go with a continuation style ContinueWith method instead of this implementation.

  2. You're using Thread.Sleep inside your code. For all you know, someone could be using this inside their Main thread, not understanding why that thread gets stuck all of the sudden, which is terriably unreliable.

  3. If the onException delegate is false, you rethrow an exception using throw new, that will lose you the StackTrace of your method, Is that what you're really after? a better suggestion might be to save them all and use an AggregationException at the end the retries.

This post has what you need:

public static Task StartNewDelayed(int millisecondsDelay, Action action) 
{ 
    // Validate arguments 
    if (millisecondsDelay < 0) 
        throw new ArgumentOutOfRangeException("millisecondsDelay"); 
    if (action == null) throw new ArgumentNullException("action"); 

    // Create the task 
    var t = new Task(action); 
    // Start a timer that will trigger it 
    var timer = new Timer( 
        _ => t.Start(), null, millisecondsDelay, Timeout.Infinite); 
    t.ContinueWith(_ => timer.Dispose());
    return t; 
}

private static Task<T> Retry<T>(Func<T> func, int retryCount, int delay, TaskCompletionSource<T> tcs = null)
{
    if (tcs == null)
        tcs = new TaskCompletionSource<T>();
    Task.Factory.StartNew(func).ContinueWith(_original =>
    {
        if (_original.IsFaulted)
        {
            if (retryCount == 0)
                tcs.SetException(_original.Exception.InnerExceptions);
            else
                Task.Factory.StartNewDelayed(delay).ContinueWith(t =>
                {
                    Retry(func, retryCount - 1, delay,tcs);
                });
        }
        else
            tcs.SetResult(_original.Result);
    });
    return tcs.Task;
} 

Upvotes: 0

Scott Chamberlain
Scott Chamberlain

Reputation: 127593

If you want this wrapper to work you will need to use async/await or write the catch block using a continuation. This is because the exception is not thrown until you try to get the result.

To do it with async/await on .NET 4.0 go download Microsoft.Bcl.Async which gives you async/await support to 4.0

public static async Task<T> Retry<T, TException>(Func<T> work, Action<TException> onException, TimeSpan retryInterval, int maxExecutionCount = 3) where TException : Exception
{
    for (var i = 0; i < maxExecutionCount; i++)
    {
        try
        {
            return await Task.Factory.StartNew(work);
        } 
        catch (AggregateException ae)
        {
            ae.Handle(e =>
            {
                if (e is TException)
                {
                    // allow program to continue in this case
                    // do necessary logging or whatever
                    if (onException != null) { onException((TException)e); }

                    Thread.Sleep(retryInterval);
                    return true;
                }
                throw new RetryWrapperException("Unexpected exception occurred", ae);
            });
        }
    }
    var msg = "Retry unsuccessful after: {0} attempt(s)".FormatWith(maxExecutionCount);
    throw new RetryWrapperException(msg);
}

I am not exactly sure what the exact way to do it without async/await would be. I know it would involve .ContunueWith( with TaskContinuationOptions.OnlyOnFaulted but I am not sure of the exact details.

Upvotes: 1

Related Questions