Reputation: 2480
I have a code like this:
Process X:
getLocationObservable() // ---> async operation that fetches the location.
// Once location is found(or failed to find) it sends it to this filter :
.filter(location -> {
--- Operation A ---
after finishing the operation A, I either return 'true' and continue
to the next observable which is a Retrofit server call, or simply
return 'false' and quit.
})
.flatMap(location -> getRetrofitServerCallObservable( location )
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
new Observer<MyCustomResponse>() {
@Override
public void onSubscribe(Disposable d) {
_disposable = d;
}
@Override
public void onNext(MyCustomResponse response) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
In the Location class it's like this :
private PublishSubject<Location> locationPublishSubject = PublishSubject.create();
public Observable<Location> getLocationObservable() {
return locationPublishSubject;
}
and then...
locationPublishSubject.onNext( foundLocation );
Problems with PublishSubject:
locationPublishSubject.onError( new <some mock exception> )
--> it
crashes with io.reactivex.exceptions.UndeliverableException
locationPublishSubject.onComplete
after onNext
--> the onNext
doesn't
happen. It jumps straight to onComplete
. Upvotes: 1
Views: 817