kolibreath
kolibreath

Reputation: 43

Retry an Observable in a merge or concat sequence

To say that I have some code like this

Observable.concat(scores)
  .observeOn(AndroidSchedulers.mainThread())
  .subscribeOn(Schedulers.io())
  .subscribe({...});

Due to the instability on the server side, an onError() notification (eg error code 500) may be poped out when emitting one of the Observables and the concat() operator will be stopped from emitting the rest of Observables as noted in the doc.

So, the failed Observable needs to be emitting again as well as the rest of Observables.

From my point of view, I try to use toBlocking() operator to turn the sequence of Observables into a blocking Observable and forEach() it

List<Observable> list = createListOfScores();
Observable.from(list)
  .toBlocking()
  .forEach(new Action1<Observable>() {
    @Override
    public void call(Observable observable) {
      observable.subscribe(...).onErrorResumeNext(...)
    }
  });

There will be better solution than this. I hope someone can enlighten me.

Upvotes: 2

Views: 333

Answers (1)

Anatolii
Anatolii

Reputation: 14660

Another way could to use retry method if onError is called for some Observable. However, in this case, Observables that run without any errors would have to be removed from the list so that they're not run again. Here's an example (here I retry to run a task 10 times before giving up):

@RunWith(RobolectricTestRunner.class)
@Config(manifest = Config.NONE)
public class RxTest {
    @Test
    public void testRetryObservableConcat() throws Exception {
        final List<Observable<String>> observables = new CopyOnWriteArrayList<>(getObservables());
        Observable.concat(observables)
          //remove the observable if it's successful
          .doOnNext(result -> observables.remove(0))
          .doOnError(error -> System.out.println(error.getMessage()))
          //if an error is thrown then retry the task only for this Observable
          .retry(10, error -> true)
          .subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(result -> System.out.println(result),
            error -> System.out.println("onError called!"));

        Thread.sleep(1000);

        ShadowLooper.runUiThreadTasks();
    }

    private List<Observable<String>> getObservables() {
        List<Observable<String>> observables = new ArrayList<>();
        final Random random = new Random();
        final AtomicInteger atomicInteger = new AtomicInteger(1);
        for (int i = 0; i < 3; i++) {
            final int id = i;
            observables.add(Observable.fromCallable(() -> {
                //for second Observable there's a 2/3 probability that it won't succeed
                if (id == 1 && random.nextInt() % 3 != 0) {
                    throw new RuntimeException("Observable " + id + " failed!");
                }
                return "Observable #" + id + " returns " + atomicInteger.getAndIncrement();
            }));
        }

        return observables;
    }
}

Output

Observable 1 failed!

Observable 1 failed!

Observable 1 failed!

Observable #0 returns 1

Observable #1 returns 2

Observable #2 returns 3

As it can be seen from the example, after the second Observable finally succeeds each result is delivered in order.

Upvotes: 3

Related Questions