NavidM
NavidM

Reputation: 1795

RxJava flat map: what happens when one of the resulting observable complete?

I'm new to RxJava, I know flatmaps are for mapping an emitted item to observable. I also know that based on the documentation the emitted observables all get combined (flatten) to a single observable stream.

I was wondering what happens if any of those inner observables get completed?

for example: I have an observable that emits a item data key. I have to make another async http call to get the item data from the server, so I call it by using another observable. I use a flat map to connect these two and create one main observable.

When does the run() method of following "SomeMethodThatWantsItems" get called?

public void someMethodThatWantsItems(MyHttpCaller httpCaller, SomeSearchEngine searchEngine)
{
    Consumer<Item> onNextConsumer = 
    Observable<Item> searchObservable = getSearchResult(httpCaller, searchEngine, "The Search Word");
    searchObservable
            .subscribeOn(Schedulers.newThread())
            .subscribe(new Consumer<Item>(){
                           @Override
                           public void accept(@NonNull Item item) throws Exception {
                               //Do stuff with the item
                           }
                       }
                , new Consumer<Exception>() { //some implementation of onErrorConsumer
                    }
                 //OnComplete
                , new Action(){

                        @Override
                        public void run() throws Exception {
                            //When does this get called??? after the search complete or when the first http call is successful? 
                        }
                    });

}

private Observable<String> getSearchResultKeys(SomeSearchEngine searchEngine, String someSearchWord)
{
    return Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull final ObservableEmitter<String> emitter) throws Exception {

            //Assume that our search engine call onFind everytime it finds something
            searchEngine.addSearchListener(new searchEngineResultListener(){
                @Override
                public void onFind(String foundItemKey){
                    emitter.onNext(foundItemKey);
                }

                @Override
                public void onFinishedFindingResults(){
                    emitter.onComplete();
                }
            });

        }
    });
}

private Observable<Item> getItemByKey(MyHttpCaller httpCaller, String key)
{

    return Observable.create(new ObservableOnSubscribe<Item>() {
        @Override
        public void subscribe(@NonNull final ObservableEmitter<Item> emitter) throws Exception {

            //Call the server to get the item
            httpCaller.call(key, new onCompleteListener(){
                @Override
                public void onCompletedCall(Item result)
                {
                    emitter.onNext(result);
                    //The result is complete! end the stream
                    emitter.onComplete();
                }
            });
        }
    });
}

public Observable<Item> getSearchResult(MyHttpCaller httpCaller, SomeSearchEngine searchEngine, String someSearchWord){
    //Where everything comes together
    Observable<String> searchResultObservable = getSearchResultKeys(searchEngine, someSearchWord);
    retuern searchResultObservable
            .observeOn(Schedulers.newThread())
            .flatMap(new Function<String, Observable<Item>>(){
                @Override
                public Observable<Item> apply(String key){
                    return getItemByKey(httpCaller, key);
                }
            });
}

Upvotes: 0

Views: 4012

Answers (1)

yosriz
yosriz

Reputation: 10267

The onComplete() always get call once and then the streams stops. (this is part of the Observable Contract).
That means that in your case, your onComplete() at SomeMethodThatWantsItems will be called after all items were retrieved.
In case of flatMap(), completion of each inner Observable, simply will signal the source Observable to stop flatting item from the inner Observable to the source Observable, flatMap() merges items from the inner Observable as long as this stream sends items, so it's basically consume the entire inner Observable stream into the source stream, the entire stream is until termination event3 like onComplete(), so in case where inner Observable can emit more than 1 item, that means that it will make more than 1 emission on the source stream.

Upvotes: 3

Related Questions