Reputation: 43
To say that I have some code like this
Observable.concat(scores)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe({...});
Due to the instability on the server side, an onError()
notification (eg error code 500) may be poped out when emitting one of the Observables and the concat()
operator will be stopped from emitting the rest of Observables as noted in the doc.
So, the failed Observable needs to be emitting again as well as the rest of Observables.
From my point of view, I try to use toBlocking() operator
to turn the sequence of Observables into a blocking Observable and forEach()
it
List<Observable> list = createListOfScores();
Observable.from(list)
.toBlocking()
.forEach(new Action1<Observable>() {
@Override
public void call(Observable observable) {
observable.subscribe(...).onErrorResumeNext(...)
}
});
There will be better solution than this. I hope someone can enlighten me.
Upvotes: 2
Views: 333
Reputation: 14660
Another way could to use retry
method if onError
is called for some Observable
. However, in this case, Observable
s that run without any errors would have to be removed from the list so that they're not run again. Here's an example (here I retry to run a task 10
times before giving up):
@RunWith(RobolectricTestRunner.class)
@Config(manifest = Config.NONE)
public class RxTest {
@Test
public void testRetryObservableConcat() throws Exception {
final List<Observable<String>> observables = new CopyOnWriteArrayList<>(getObservables());
Observable.concat(observables)
//remove the observable if it's successful
.doOnNext(result -> observables.remove(0))
.doOnError(error -> System.out.println(error.getMessage()))
//if an error is thrown then retry the task only for this Observable
.retry(10, error -> true)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(result -> System.out.println(result),
error -> System.out.println("onError called!"));
Thread.sleep(1000);
ShadowLooper.runUiThreadTasks();
}
private List<Observable<String>> getObservables() {
List<Observable<String>> observables = new ArrayList<>();
final Random random = new Random();
final AtomicInteger atomicInteger = new AtomicInteger(1);
for (int i = 0; i < 3; i++) {
final int id = i;
observables.add(Observable.fromCallable(() -> {
//for second Observable there's a 2/3 probability that it won't succeed
if (id == 1 && random.nextInt() % 3 != 0) {
throw new RuntimeException("Observable " + id + " failed!");
}
return "Observable #" + id + " returns " + atomicInteger.getAndIncrement();
}));
}
return observables;
}
}
Output
Observable 1 failed!
Observable 1 failed!
Observable 1 failed!
Observable #0 returns 1
Observable #1 returns 2
Observable #2 returns 3
As it can be seen from the example, after the second Observable
finally succeeds each result is delivered in order.
Upvotes: 3