Reputation: 109
public Observable<List<A>> buildObservable(final String bId) {
final Observable<List<B>> bObs = bRepository.getBs(bId);
return Observable.create(new ObservableOnSubscribe<List<A>>() {
@Override
public void subscribe(final ObservableEmitter<List<A>> subscriber) throws Exception {
bObs.subscribe(new DefaultObserver<List<B>>() {
@Override
public void onNext(final List<B> l_B) {
if (l_B.isEmpty()) {
subscriber.onNext(new LinkedList<A>());
subscriber.onComplete();
} else {
final List<A> l_A = new LinkedList<A>();
for (B b : l_B) {
Observable<A> aObs = aRepository.getbyB(b);
aObs.subscribe(new DefaultObserver<A>() {
@Override
public void onNext(A a) {
l_A.add(a);
if (l_A.size() == l_B.size()) {
subscriber.onNext(l_A);
subscriber.onComplete();
}
}
});
}
}
}
});
}
});
}
I find myself having to implement the above pattern over and over again. I have an observable bObs with a subscriber which triggers a set of observables conditioned on the value from bObs. After they all return, the primary observer returned by the function emits a value.
I feel like there should be a easier way to do this, since this pattern is probably pretty common. Is there some kind of composite operator that helps me take aObs and bObs and construct the top level observable?
Upvotes: 0
Views: 88
Reputation: 4077
As a rule of thumb, avoid creating observables yourself with .create()
as much as possible, because there is almost always a better shorter error proof standard way such as .just()
.empty()
.from()
etc.
Secondly, you are totally right that using imperative flow control such as if
/for
in Rx is just weird because Rx is built upon CPS (Continuation-Passing Style) concept which is the flow control in itself.
So in your case it will be something like:
bObs.flatMap((b) -> { b.aRepository.getbyB(b).defaultIfEmpty(new ArrayList()); } ).toList()
Upvotes: 0
Reputation: 109
Adapting Maxim's proposal above, I was able to find a solution that I am happy with:
return bRepository
.getBs(bId)
.defaultIfEmpty(new ArrayList<B>())
.flatMap(Observable::from)
.flatMap((b)->{aRepository.getbyB(b);})
.toList()
.toObservable();
Upvotes: 0