Reputation: 65
I have the following problem: I have a subject which uses method OnNext to signalize new events. However, there can occur an exception which is simulated in the code bellow with method call OnError. The cause of the error is time dependent so if I try to repeat the same action again a few times, it will succeed. So I want to use the catch method which accepts a function generating new sequences to repeat the action let's say 5 times. If all repeated attempt fail, then it should throw a final exception and if at least one succeeds it should continue with OnNext(5) and OnNext(6).
Ideally the exception and the subsequent OnError call should be prevented, but that isn't in my case possible.
I have tried multiple scenarios, with methods Retry, Concat, Catch etc... but nothing worked as I wanted.
Subject<int> sub = new Subject<int>();
var seq = sub.Select(x =>
{
//time dependent operation
Console.WriteLine(x);
return x;
}).
Catch((SeqException<int> ex) =>
{
return Observable.Empty(0); // what sequece to return to achieve the desired behaviour
});
seq.Subscribe();
sub.OnNext(1);
sub.OnNext(2);
sub.OnNext(3);
sub.OnError(new SeqException<int>{ Value = 4});
sub.OnNext(5);
sub.OnNext(6);
seq.Wait();
Thanks in advance.
Upvotes: 0
Views: 1069
Reputation: 12667
Case 1: If the time dependent operation inside the Select
is what you need to retry.
Use Observable.Start
to convert a thunk into an observable. Then you get to use all the recovery operators to declare your behavior.
Subject<int> sub = new Subject<int>();
var seq = sub.SelectMany(x =>
Observable.Start(() =>
{
//time dependent failure
if (DateTime.Now.Second % 2 == 0)
throw new Exception();
Console.WriteLine(x);
return x;
})
.Retry(5)
.Catch(Observable.Return(x))
);
//for testing
Observable.Interval(TimeSpan.FromSeconds(1)).Select(x => (int)x).Subscribe(sub);
seq.Wait();
Case 2: You're actually getting signalled through the Subject
.
The Rx contract requires that no more notifications occur after OnComplete|OnError. And on receiving that error, the subject disposes all its subscriptions and the pipeline is torn down. You need to bring it around to Case 1 to make it work.
Upvotes: 1
Reputation: 19096
You can always wrap anything with a retry block. Just an example for an action with one argument.
Action<T> RetryAction<T>( Action<T> action, int retries )
{
return arg => {
int count = 0;
while ( true )
{
try
{
action( arg );
return;
}
catch
{
if ( count == retries )
throw;
count++;
}
}
}
}
So you do not have to change the logic of the sequence handling.
Upvotes: 1