Reputation: 3924
I have a case when I have multiple observables, each observable has its own implementation, they may be with the same type, or different I didn't decide know yet, but let's assume it they're the same type.
Observable<String> source1;
Observable<String> source2;
Observable<String> source3;
Observable<String> source4;
what I need to do now is to execute only one of them, so stream only move to the next observable if the previous one failed.
Some potential solutions:
the onErrorResumeNext() which it may be good if they're only two observables, but in my case here, if I need to change the order of execution it will hard to update each observable.
there is the combineLatest but I don't know if it behaves the way I described, or what modification to make work as I need.
how to achieve something like this and if they're with different types, what I need to do?
Upvotes: 1
Views: 492
Reputation: 480
You can get a combined observable using onErrorResumeNext
and reduce
like this:
Observable<String> buildObservable(List<Observable<String>> observables) {
return Observable.fromIterable(observables)
.reduce(Observable::onErrorResumeNext)
.flatMapObservable(obs -> obs);
}
UPDATE:
To explain further, if you call the method with a list [o1, o2, o3]
, then
the fromIterable
will return a higher-level observable equivalent to just(o1, o2, o3)
the reduce
will combine the elements of this observable, sequentially calling onErrorResumeNext()
with each element, like this:
o1 -> o1.onErrorResumeNext(o2) -> o1.onErrorResumeNext(o2).onErrorResumeNext(o3),
resulting in a still "higher level" 1-element observable that is equivalent to just(o1.onErrorResumeNext(o2).onErrorResumeNext(o3))
.
the flatMapObservable()
line will replace this 1-element observable with its one and only element itself, which is o1.onErrorResumeNext(o2).onErrorResumeNext(o3)
(without the just()
).
This result implements the fallback mechanism you need.
Upvotes: 1
Reputation: 17439
I don't know if there is better way to do it, but I would just use onErrorResumeNext()
with the help of some methods for making it flexible:
Observable<String> buildObservable(Observable<String> obs, Observable<String>... subsequentObservables) {
Observable<String> observable = obs;
for (int i = 0; i < subsequentObservables.length; i++) {
observable = concatErrorObservable(observable, subsequentObservables[i]);
}
return observable;
}
where concatErrorObservable
is:
Observable<String> concatErrorObservable(Observable<String> observable, Observable<String> observable2) {
return observable.onErrorResumeNext(observable2);
}
So you just need to provide the list of Observable
to the buildObservable
method. For example:
buildObservable(Observable.error(new Throwable("error!!")),
Observable.just("observable2"),
Observable.just("observable3"))
.subscribe(s -> Log.d(TAG, "result: " + s));
will print observable2
(in the logcat) because the first observable throws an error.
About the different types, you probably need a different map
for each Observable
, because I think your consumer (observer) will just expect one type of emitted data.
Upvotes: 1