Reputation: 1901
I have a method that creates and merges two Observable<T>
's.
private Observable<String> getData() {
Observable<String> observable1 = Observable.just("Just string")
.delay(2, TimeUnit.SECONDS);
Observable<String> observable2 = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onError(new Exception("Oops!"));
}
});
Observable<String> merged = observable1.mergeWith(observable2);
return merged;
}
The problem is that the error in the observable2
emitted earlier than the data in the observable1
.
getData().subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.d("rx", "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d("rx", "onError: " + e.getMessage());
}
@Override
public void onNext(String s) {
Log.d("rx", "onNext: " + s);
}
});
observable2
emitted optional data and errors from observable2
must not terminate emission, but on the other hand, I want to handle all errors and data in a single observer. Can I return this error after the result from the observable1
? Is that possible?
Expect results:
onNext: Just string
onError: Oops!
Upvotes: 2
Views: 255
Reputation: 15824
Sounds like you need something like Observable.mergeDelayError()
.
It applies a merge
operation but if some of the Observables call onError
, it doesn't stop right away but captures the error(s).
Once all other Observables have gracefully completed, the error(s) is(are) propagated, potentially combined into a single CompositeException
.
Upvotes: 3