Greg Rogoszewski
Greg Rogoszewski

Reputation: 93

RX merge multiple observables

In my snippet I am making an api request and everything works fine. I am now looking to check the response to see if the total number of items available on the server is greater than the returned result set defined by the pagesize. If it is I would like to make additional api calls until all results are retrieved and returned to the subscriber as one response. Which RX operator do I need to use to accomplish this and how can I pause the return of the response below until the subsequent api calls have been completed?

    getAction<T>(path: string, params?: {}): Observable<T> {
        return this._http.get("url")
            .map(res => {
                let response = res.json();
                // If more pages available make additional api calls & return as single result
                return response;
            });
    }

Upvotes: 0

Views: 610

Answers (2)

Konrad Garus
Konrad Garus

Reputation: 54045

Have a look at expand.

To fetch multiple pages of data recursively, you could do something like:

class MyExample {
  search(offset) {
    return this.http.get(`/search?offset=${offset}`);
  }

  searchAll() {
    return this.search(0)
               .expand(results => {
                 if (loadNextPage(results)) {
                   return this.search(results.nextPageOffset);
                 } else {
                   return Observable.empty();
                 }
               });
  }
}

expand allows you to do some processing based on previous results (like check if there are more pages), and specify an Observable with more results. The results of all these calls will be concatenated, not need to worry about carrying them over yourself.

Upvotes: 1

Bruno Jo&#227;o
Bruno Jo&#227;o

Reputation: 5545

You should use switchmap to get the response from another observable until you have all the data. Just concatenates all the responses and return it as the response in the last observable. Something like:

//emit immediately, then every 5s
const source = Rx.Observable.timer(0, 5000);

//switch to new inner observable when source emits, emit items that are emitted
const example = source.switchMap(() => Rx.Observable.interval(500));

//output: 0,1,2,3,4,5,6,7,8,9...0,1,2,3,4,5,6,7,8
const subscribe = example.subscribe(val => console.log(val));

Upvotes: 1

Related Questions