ohadinho
ohadinho

Reputation: 7144

Retry mechanism with observable

I'm trying to write retry mechanism using C# observables.

  1. Retry has a retry count and retry interval
  2. Retry should execute "OnExecute" method.
  3. On every exception it would execute "OnCatch" method.

Here is what I tried to do:

public static IObservable<T> Retry(GenericRetryExecutorRequest<T> request)
{
    var source = Observable.Timer(TimeSpan.Zero, request.Interval)
        .Select(item =>
        {
          return request.GenericRetryActions.OnExecute();
        });

    var retryObservable = source
        .Retry(request.RetryCount)
        .Catch(source);

    return retryObservable;
}

public class GenericRetryExecutorRequest<T>
{
    public int RetryCount { get; set; } = 3; 
    public TimeSpan Interval { get; set; } = new TimeSpan(0,0,0,5);
    public IGenericRetryActions<T> GenericRetryActions { get; set; }
}

public interface IGenericRetryActions<out T>
{
    T OnExecute();
    void OnCatch();
}

Unfortunately - it doesn't perform well:

  1. I do not know how to execute OnCatch when exception is thrown. I've tried many ways with no success.
  2. OnExecute doesn't seems to perform repeatedly (with the request interval) in case it throws an exception.

Upvotes: 2

Views: 718

Answers (1)

Shlomo
Shlomo

Reputation: 14350

Try this:

public static IObservable<T> Retry<T>(this GenericRetryExecutorRequest<T> request)
{
    return Observable.Timer(Timespan.Zero, request.Interval)
        .Select(item =>
        {
            try
            {
                var value = request.GenericRetryActions.OnExecute();
                return Notification.CreateOnNext(value);
            }
            catch(Exception e)
            {
                request.GenericRetryActions.OnCatch();
                return Notification.CreateOnError<T>(e);
            }
        })
        .Dematerialize()
        .Retry(request.RetryCount);
}

Generally using try-catch inside an observable is frowned upon; it is preferred to use the observable On-Error exception handling. However, your interface's OnExecute doesn't return IObservable<T>, rather just T. So you're forced to use try-catch. If you were to change the interface to return IObservable<T>, then I think this would work:

public class GenericRetryExecutorRequest2<T>
{
    public int RetryCount { get; set; } = 3;
    public TimeSpan Interval { get; set; } = new TimeSpan(0, 0, 0, 5);
    public IGenericRetryActions2<T> GenericRetryActions { get; set; }
}

public interface IGenericRetryActions2<out T>
{
    IObservable<T> OnExecute();
    void OnCatch();
}

public static IObservable<T> Retry2<T>(this GenericRetryExecutorRequest2<T> request)
{
    return Observable.Timer(Timespan.Zero, request.Interval)
        .SelectMany(_ => request.GenericRetryActions.OnExecute())
        .Catch((Exception e) => Observable.Return(Unit.Default)
            .Do(_ => request.GenericRetryActions.OnCatch())
            .SelectMany(Observable.Throw<T>(e))
        )
        .Retry(request.RetryCount);
}

That's all assuming you want the mechanism to keep firing on success. If not, add a Take(1) at the end of either solution.

Upvotes: 4

Related Questions