Reputation: 4523
Currently I'm using Observable.retry(n) in order to retry in case of error. Please take a look at this code:
@Test
public void testObservableRetry() {
Observable
.<String>defer(() -> Observable
.just(Notification.createOnNext("valid value"), Notification.createOnError(new Error("Test")))
.dematerialize())
.doOnNext(v -> System.out.println("onNext: " + v))
.doOnError(e -> System.out.println("onError: " + e.getMessage()))
.doOnSubscribe(d -> System.out.println("onSubscribe"))
.doOnDispose(() -> System.out.println("onDispose"))
.retry(2)
.take(2) // .take(4) will fail the test
.blockingLast();
}
The code above is able to resume after error, but the retry number is limited for total errors number and not consecutive errors.
E.g. if you replace .take(2)
with .take(4)
- the test will fail, because total errors number will exceed 4, despite the fact that after each error it resumes and is able to get next value:
---(v1)----(error)---(v2)----(error)----(v3)-----(error)---(v4)-
I'd like to find a way to reset a counter after successful resume. The use case is for example network connection - I'd like to give the same number of tries on each disconnection, but after each successful connect I'd like to reset the counter in order to allow endless flow unless each time it's able to resume within constant number of tries.
EDIT:
Successful resume in this context means receiving at least 1 item after retry. So I'd like to limit only the number of consecutive errors, and not total errors.
Upvotes: 2
Views: 1444
Reputation: 4523
This is just a little bit modified @akarnokd's solution (just removing nonEmpty
flag). @akarnokd - Fill free to edit your solution and I will delete this one.
static <T> ObservableTransformer<T, T> retryEmpty(int count) {
return o -> {
AtomicInteger remaining = new AtomicInteger(count);
return o.doOnNext(v -> remaining.lazySet(count))
.retryWhen(err -> err.flatMap(e ->
(remaining.decrementAndGet() == 0)
? Observable.<T>error(e)
: Observable.just(1)));
};
}
Upvotes: 3
Reputation: 69997
You need retryWhen
and communicate with it from before the flow:
@Test
public void emptyError() {
AtomicInteger c = new AtomicInteger();
Observable.fromCallable(() -> { throw new Exception("" + c.incrementAndGet()); })
.compose(retryEmpty(2))
.test()
.assertFailureAndMessage(Exception.class, "2");
}
@Test
public void nonEmptyError() {
Observable.just(1).concatWith(Observable.error(new Exception()))
.compose(retryEmpty(2))
.take(4)
.test()
.assertResult(1, 1, 1, 1);
}
static <T> ObservableTransformer<T, T> retryEmpty(int count) {
return o ->
Observable.defer(() -> {
AtomicInteger remaining = new AtomicInteger(count);
AtomicBoolean nonEmpty = new AtomicBoolean();
return o.doOnNext(v -> nonEmpty.lazySet(true))
.retryWhen(err ->
err.flatMap(e -> {
if (nonEmpty.get()) {
nonEmpty.lazySet(false);
remaining.lazySet(count);
} else
if (remaining.decrementAndGet() == 0) {
return Observable.error(e);
}
return Observable.just(1);
})
);
});
}
Upvotes: 2