Tedd Parsile
Tedd Parsile

Reputation: 65

How to retry an action OnError reactive X IObservable several times and if successful continue

I have the following problem: I have a subject which uses method OnNext to signalize new events. However, there can occur an exception which is simulated in the code bellow with method call OnError. The cause of the error is time dependent so if I try to repeat the same action again a few times, it will succeed. So I want to use the catch method which accepts a function generating new sequences to repeat the action let's say 5 times. If all repeated attempt fail, then it should throw a final exception and if at least one succeeds it should continue with OnNext(5) and OnNext(6).

Ideally the exception and the subsequent OnError call should be prevented, but that isn't in my case possible.

I have tried multiple scenarios, with methods Retry, Concat, Catch etc... but nothing worked as I wanted.

Subject<int> sub = new Subject<int>();

var seq = sub.Select(x =>
{
    //time dependent operation
    Console.WriteLine(x);
    return x;
}).
Catch((SeqException<int> ex) =>
{
    return Observable.Empty(0); // what sequece to return to achieve the desired behaviour
});
seq.Subscribe();

sub.OnNext(1);
sub.OnNext(2);
sub.OnNext(3);
sub.OnError(new SeqException<int>{ Value = 4});
sub.OnNext(5);
sub.OnNext(6);

seq.Wait();

Thanks in advance.

Upvotes: 0

Views: 1069

Answers (2)

Asti
Asti

Reputation: 12667

Case 1: If the time dependent operation inside the Select is what you need to retry.

Use Observable.Start to convert a thunk into an observable. Then you get to use all the recovery operators to declare your behavior.

Subject<int> sub = new Subject<int>();      

var seq = sub.SelectMany(x =>            
        Observable.Start(() =>
        {
            //time dependent failure
            if (DateTime.Now.Second % 2 == 0)
                throw new Exception();

            Console.WriteLine(x);
            return x;
        })
        .Retry(5)
        .Catch(Observable.Return(x))
    );

//for testing
Observable.Interval(TimeSpan.FromSeconds(1)).Select(x => (int)x).Subscribe(sub);

seq.Wait();

Case 2: You're actually getting signalled through the Subject.

The Rx contract requires that no more notifications occur after OnComplete|OnError. And on receiving that error, the subject disposes all its subscriptions and the pipeline is torn down. You need to bring it around to Case 1 to make it work.

Upvotes: 1

Sir Rufo
Sir Rufo

Reputation: 19096

You can always wrap anything with a retry block. Just an example for an action with one argument.

Action<T> RetryAction<T>( Action<T> action, int retries )
{
    return arg => {
        int count = 0;
        while ( true )
        {
            try
            { 
                action( arg );
                return; 
            }
            catch
            { 
                if ( count == retries )
                    throw;
                count++;
            }
        }
    }
}

So you do not have to change the logic of the sequence handling.

Upvotes: 1

Related Questions