xplat
xplat

Reputation: 8626

Retry async Task code using Reactive Extensions

Having the code below in my data access class.

public async Task<IEnumerable<TEntity>> QueryAsync(string sql, object param = null,
            CommandType commandType = CommandType.Text, int? commandTimeout = null, IDbTransaction transaction = null)
        {
            using (var connection = Connection)
            {
                var tokenSource = GetCancellationTokenSource(commandTimeout ?? CommandTimeoutDefault);
                Task<IEnumerable<TEntity>> queryTask =
                    connection.QueryAsync<TEntity>(new CommandDefinition(sql, param, transaction,
                        commandTimeout ?? CommandTimeoutDefault, commandType, cancellationToken: tokenSource.Token));
                IEnumerable<TEntity> data = await queryTask.ConfigureAwait(false);
                connection.Close();
                connection.Dispose();
                tokenSource.Dispose();
                return data;
            }
        }

I want when a SqlExeption thrown to retry once. Have in mind that I can't apply RX to the application but only in this block of code.

I tried the below code, it looks like it is executing correctly and Do is logging in the Console Output but doesn't really invoke the Catch handler and I'm not sure if Retry handler is executed as well.

public async Task<IEnumerable<TEntity>> QueryAsync(string sql, object param = null,
            CommandType commandType = CommandType.Text, int? commandTimeout = null, IDbTransaction transaction = null)
        {
            return await Observable.Defer(async () =>
            {
                using (var connection = Connection)
                {
                    var tokenSource = GetCancellationTokenSource(commandTimeout ?? CommandTimeoutDefault);
                    Task<IEnumerable<TEntity>> queryTask =
                        connection.QueryAsync<TEntity>(new CommandDefinition(sql, param, transaction,
                            commandTimeout ?? CommandTimeoutDefault, commandType, cancellationToken: tokenSource.Token));
                    IEnumerable<TEntity> data = await queryTask.ConfigureAwait(false);
                    connection.Close();
                    connection.Dispose();
                    tokenSource.Dispose();
                    return Observable.Return(data);
                }
            })
            .Catch<IEnumerable<TEntity>, SqlException>(source =>
           {
               Debug.WriteLine($"QueryAsync Exception {source}");
               return Observable.Return(new List<TEntity>());
           })
           .Throttle(TimeSpan.FromMilliseconds(500))
           .Retry(1)
           .Do(_ => Debug.WriteLine("Do QueryAsync"));
        }

Upvotes: 4

Views: 869

Answers (1)

Kent Boogaart
Kent Boogaart

Reputation: 178630

I can see several potential problems with your code:

  • separate the retry logic from the main logic, in a method called QueryWithRetryAsync for example. This is just a design problem, but a problem nonetheless
  • don't Catch until after the Retry. Otherwise the SqlException will result in an empty list and the Retry operator will never see the exception
  • I don't think the Throttle is necessary at all, since you only ever expect one value through the pipeline
  • Retry(1) does not do what you think it does (this was a surprise to me, too). It seems the definition of "retry" includes the first invocation, so you need Retry(2)

Here is a standalone example that behaves the way you want:

class Program
{
    static void Main(string[] args)
    {
        var pipeline = Observable
            .Defer(() => DoSomethingAsync().ToObservable())
            .Retry(2)
            .Catch<string, InvalidOperationException>(ex => Observable.Return("default"));

        pipeline
            .Do(Console.WriteLine)
            .Subscribe();

        Console.ReadKey();
    }

    private static int invocationCount = 0;

    private static async Task<string> DoSomethingAsync()
    {
        Console.WriteLine("Attempting DoSomethingAsync");

        await Task.Delay(TimeSpan.FromSeconds(2));

        ++invocationCount;

        if (invocationCount == 2)
        {
            return "foo";
        }

        throw new InvalidOperationException();
    }
}

Upvotes: 6

Related Questions