Stefan Zhelyazkov
Stefan Zhelyazkov

Reputation: 2981

Wait for Observable to complete within an Observable

I have the following data structure:

export class Repo {
    id: number;
    name: string;
    contributors: Contributor[];
}

export class Contributor {
    id: number;
    login: string;
}

I am using Observable<Repo> to fetch all repo data but I want to internally call another observable Observable<Contributor> to populate all contributors before the outer Repo is considered fully emitted. I am not sure how to do this. I have the following code.

private repos: Repo[] = [];

getRepos(orgName: string): void {
    const repoBuild: Repo[] = [];
    this.githubService.getRepos(orgName).pipe(
        // this here won't wait for all contributors to be resolved
        map(repo => this.getRepoContributors(orgName, repo))
    ).subscribe(
        repo => repoBuild.push(repo),
        error => {
            this.repos = [];
            console.log(error);
        },
        () => this.repos = repoBuild
    );
}

// fetches contributors data for the repo
private getRepoContributors(orgName: string, repo: Repo): Repo {
    const contributors: Contributor[] = [];
    repo.contributors = contributors;
    this.githubService.getRepoContributors(orgName, repo.name)
        .subscribe(
            contributor => {
                // add to the total collection of contributors for this repo
                contributors.push(contributor);
            },
            error => console.log(error),
            () => repo.contributors = contributors
        );
    return repo;
}

I admit that my understanding on Observable's is limited and I have struggled on this for quite few hours. I tried finding something that works for me on StackOverflow but I still could not find a working solution. Any help would be appreciated!

(The code is written in Angular 5)

Solution:

I used @joh04667 suggestion below and eventually got it to work. Here is how I did it:

getRepos(orgName: string): void {
    const repoBuild: Repo[] = [];
    this.githubService.getRepos(orgName).pipe(
        // mergeMap() replaces `repo` with the result of the observable from `getRepoContributors`
        mergeMap(repo => this.getRepoContributors(orgName, repo))
    ).subscribe(
        repo => repoBuild.push(repo),
        error => {
            this.repos = [];
            console.log(error);
        },
        () => this.repos = repoBuild
    );
}

// fetches contributors data for the repo
private getRepoContributors(orgName: string, repo: Repo): Observable<Repo> {
    repo.contributors = []; // make sure each repo has an empty array of contributors
    return this.githubService.getRepoContributors(orgName, repo.name).pipe(
        // tap() allows us to peek on each contributor and add them to the array of contributors
        tap(contributor => {
            // add to the total collection of contributors for this repo
            repo.contributors.push(contributor);
        }),
        // only picks the last contributor and replaces him/her with the repo
        last(
            () => false,
            () => repo,
            repo
        )
    );
}

Here in the last part where I use last() I basically tell the Observable that even though it will process all values, I am only going to consume the last one. The last one is of type Contributor but I replace it with a default value (the repo) which allows me to change the return type from Observable<Contributor> to Observable<Repo> which is exactly what I need for the higher-level Observable.

Upvotes: 3

Views: 10634

Answers (1)

joh04667
joh04667

Reputation: 7427

This is a good question, and to me this is the 'big step' to really understanding the full power of Observables: higher-order Observables, or Observables that return Observables.

Your situation is perfect for the mergeMap / flatMap operator:

getRepos(orgName: string): void {
    const repoBuild: Repo[] = [];
    this.githubService.getRepos(orgName).pipe(
        // this will map the emissions of getRepos to getRepoContributors and return a single flattened Observable
        mergeMap(repo => this.getRepoContributors(orgName, repo))
    ).subscribe(
        repo => repoBuild.push(repo),
        error => {
            this.repos = [];
            console.log(error);
        },
        () => this.repos = repoBuild
    );
}

mergeMap will map emissions of an outer Observable (getRepos) to an inner Observable (getRepoContributors) and return a new Observable that only emits when the inner Observable completes. In other words, it flattens out values that are passed from one Observable to another into one succinct subscribe-able data stream.

Higher-order Observables can be hard to wrap your head around, but that is really where the true power of Observables lie. I highly recommend checking out some other operators like switchMap and concatMap on the site I linked as well. Observables can get crazy powerful when used to their fullest extent.

EDIT

I misread the original code and thought getRepoContributors was returning an Observable. Long work day had me fried. I'll refactor here:

map is good for altering the value before merging it with mergeMap. The first couple lines of getRepoContributors can be done there. Since mergeMap needs an Observable returned to it, we can simplify a bit:

private repos: Repo[] = [];

getRepos(orgName: string): void {
    const repoBuild: Repo[] = [];
    this.githubService.getRepos(orgName).pipe(
        map(repo => {
            repo.contributors = [];
            return repo;
          }),
        mergeMap(repo => this.githubService.getRepoContributors(orgName, repo.name)),
        map(contributor => {
            repo.contributors.push(contributor)
          })
    ).subscribe(
        repo => repoBuild.push(repo),
        error => {
            this.repos = [];
            console.log(error);
        },
        () => this.repos = repoBuild
    );
}

// don't need second function anymore

We can map in the stream expecting the value to change in sequential orders with the operator.

Upvotes: 4

Related Questions