Reputation: 1795
I'm new to RxJava, I know flatmaps are for mapping an emitted item to observable. I also know that based on the documentation the emitted observables all get combined (flatten) to a single observable stream.
I was wondering what happens if any of those inner observables get completed?
for example: I have an observable that emits a item data key. I have to make another async http call to get the item data from the server, so I call it by using another observable. I use a flat map to connect these two and create one main observable.
When does the run() method of following "SomeMethodThatWantsItems" get called?
public void someMethodThatWantsItems(MyHttpCaller httpCaller, SomeSearchEngine searchEngine)
{
Consumer<Item> onNextConsumer =
Observable<Item> searchObservable = getSearchResult(httpCaller, searchEngine, "The Search Word");
searchObservable
.subscribeOn(Schedulers.newThread())
.subscribe(new Consumer<Item>(){
@Override
public void accept(@NonNull Item item) throws Exception {
//Do stuff with the item
}
}
, new Consumer<Exception>() { //some implementation of onErrorConsumer
}
//OnComplete
, new Action(){
@Override
public void run() throws Exception {
//When does this get called??? after the search complete or when the first http call is successful?
}
});
}
private Observable<String> getSearchResultKeys(SomeSearchEngine searchEngine, String someSearchWord)
{
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<String> emitter) throws Exception {
//Assume that our search engine call onFind everytime it finds something
searchEngine.addSearchListener(new searchEngineResultListener(){
@Override
public void onFind(String foundItemKey){
emitter.onNext(foundItemKey);
}
@Override
public void onFinishedFindingResults(){
emitter.onComplete();
}
});
}
});
}
private Observable<Item> getItemByKey(MyHttpCaller httpCaller, String key)
{
return Observable.create(new ObservableOnSubscribe<Item>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<Item> emitter) throws Exception {
//Call the server to get the item
httpCaller.call(key, new onCompleteListener(){
@Override
public void onCompletedCall(Item result)
{
emitter.onNext(result);
//The result is complete! end the stream
emitter.onComplete();
}
});
}
});
}
public Observable<Item> getSearchResult(MyHttpCaller httpCaller, SomeSearchEngine searchEngine, String someSearchWord){
//Where everything comes together
Observable<String> searchResultObservable = getSearchResultKeys(searchEngine, someSearchWord);
retuern searchResultObservable
.observeOn(Schedulers.newThread())
.flatMap(new Function<String, Observable<Item>>(){
@Override
public Observable<Item> apply(String key){
return getItemByKey(httpCaller, key);
}
});
}
Upvotes: 0
Views: 4012
Reputation: 10267
The onComplete()
always get call once and then the streams stops. (this is part of the Observable Contract).
That means that in your case, your onComplete()
at SomeMethodThatWantsItems
will be called after all items were retrieved.
In case of flatMap()
, completion of each inner Observable
, simply will signal the source Observable
to stop flatting item from the inner Observable
to the source Observable
, flatMap()
merges items from the inner Observable
as long as this stream sends items, so it's basically consume the entire inner Observable
stream into the source stream, the entire stream is until termination event3 like onComplete()
, so in case where inner Observable
can emit more than 1 item, that means that it will make more than 1 emission on the source stream.
Upvotes: 3