Mohamed Ibrahim
Mohamed Ibrahim

Reputation: 3924

Rxjava: multible Observables, execute the next observable if only the previous failed

I have a case when I have multiple observables, each observable has its own implementation, they may be with the same type, or different I didn't decide know yet, but let's assume it they're the same type.

Observable<String> source1;
Observable<String> source2;
Observable<String> source3;
Observable<String> source4;

what I need to do now is to execute only one of them, so stream only move to the next observable if the previous one failed.

Some potential solutions:

how to achieve something like this and if they're with different types, what I need to do?

Upvotes: 1

Views: 492

Answers (2)

Janos Breuer
Janos Breuer

Reputation: 480

You can get a combined observable using onErrorResumeNext and reduce like this:

Observable<String> buildObservable(List<Observable<String>> observables) {
    return Observable.fromIterable(observables)
            .reduce(Observable::onErrorResumeNext)
            .flatMapObservable(obs -> obs);
}

UPDATE: To explain further, if you call the method with a list [o1, o2, o3], then

  • the fromIterable will return a higher-level observable equivalent to just(o1, o2, o3)

  • the reduce will combine the elements of this observable, sequentially calling onErrorResumeNext() with each element, like this:

    o1 -> o1.onErrorResumeNext(o2) -> o1.onErrorResumeNext(o2).onErrorResumeNext(o3), 
    

    resulting in a still "higher level" 1-element observable that is equivalent to just(o1.onErrorResumeNext(o2).onErrorResumeNext(o3)).

  • the flatMapObservable() line will replace this 1-element observable with its one and only element itself, which is o1.onErrorResumeNext(o2).onErrorResumeNext(o3) (without the just()).

This result implements the fallback mechanism you need.

Upvotes: 1

GVillani82
GVillani82

Reputation: 17439

I don't know if there is better way to do it, but I would just use onErrorResumeNext() with the help of some methods for making it flexible:

Observable<String> buildObservable(Observable<String> obs, Observable<String>... subsequentObservables) {
    Observable<String> observable = obs;
    for (int i = 0; i < subsequentObservables.length; i++) {
        observable = concatErrorObservable(observable, subsequentObservables[i]);
    }

    return observable;
}

where concatErrorObservable is:

Observable<String> concatErrorObservable(Observable<String> observable, Observable<String> observable2) {
        return observable.onErrorResumeNext(observable2);
    }

So you just need to provide the list of Observable to the buildObservable method. For example:

buildObservable(Observable.error(new Throwable("error!!")), 
    Observable.just("observable2"), 
    Observable.just("observable3"))
.subscribe(s -> Log.d(TAG, "result: " + s));

will print observable2 (in the logcat) because the first observable throws an error.

About the different types, you probably need a different map for each Observable, because I think your consumer (observer) will just expect one type of emitted data.

Upvotes: 1

Related Questions