user15570950
user15570950

Reputation:

How to complete a subscriber when a for loop is done?

If you make a GET request to https://api.github.com/users/benawad/repos you'll get all the repositores that the user benawad has. At least that's what that API endpoint is expected to return. The problem is that GitHub limits the number of repositores to 30.

enter image description here

As of today (14/08/2021), the user benawad has 246 repositores.

To overcome the above mentioned problem you should make a GET request but with some additional params... GitHub has implemented pagination to it's API. So, in the URL you should specify the page you want to retrieve and the amount of repositores per page.

Our new GET request should look something like this:

https://api.github.com/users/benawad/repos?page=1&per_page=1000

The problem with this, is that GitHub has limited the amount of repositores per page to a 100. So our GET request returns a 100 repos and not the 246 that the user benawad currrently has.

enter image description here

In my Angular service I have implemented the following code to retrieve all repos from all the pages.

  public getUserRepos(user: string): Observable<RepositoryI[]> {
    return new Observable((subscriber: Subscriber<RepositoryI[]>) => {
      this.getUserData(user).subscribe((data: UserI) => {
        //public_repos is the amt of repos the user has
        const pages: number = Math.ceil(data.public_repos / 100);

        for (let i = 1; i <= pages; i++) {
          this.http
            .get(
              `https://api.github.com/users/${user}/repos?page=${i}&per_page=100`
            )
            .subscribe((data: RepositoryI[]) => {
              subscriber.next(data);
            });
        }
      });
    });
  }

And in my component I subscribe with the following:

  this.userService.getUserRepos(id).subscribe((repos)=>{
    this.repositories.push(...repos);
  })

The problem with this aproach is that I have no control of when the Observable has stopped emitting values. In my component I would like to trigger a function when the Observable is complete.

I've tried the following:

  public getUserRepos(user: string): Observable<RepositoryI[]> {
    return new Observable((subscriber: Subscriber<RepositoryI[]>) => {
      this.getUserData(user).subscribe((data: UserI) => {
        const pages: number = Math.ceil(data.public_repos / 100);

        for (let i = 1; i <= pages; i++) {
          this.http
            .get(
              `https://api.github.com/users/${user}/repos?page=${i}&per_page=100`
            )
            .subscribe((data: RepositoryI[]) => {
              subscriber.next(data);
              // If the for loop is complete -> complete the subscriber
              if(pages == i) subscriber.complete();
            });
        }
      });
    });
  }

And in my component I do the following:

  this.userService.getUserRepos(id).subscribe(
    (repos) => {
      this.repositories.push(...repos);
    },
    (err) => {
      console.log(err);
    },
    () => {
      // When the observable is complete:
      
      console.log(this.repositories); // This only logs 46 repositores, it should log 246
      // I want to trigger some function here
    }
  );

The console.log() only logs 46 repositores. Why is this happening? Maybe im completing the subscriber before it can fetch all 3 pages; but I'm calling the .complete() inside the subscription. What am I doing wrong? Thanks in advance.

Upvotes: 7

Views: 597

Answers (5)

Joshua McCarthy
Joshua McCarthy

Reputation: 1852

Attempting to provide a more straightforward answer. If you don't like the nesting, you can always declare an inner observable as a separate method.

public getUserRepos = (user:string) =>
  this.getUserData(user)
  .pipe(
    map(({public_repos})=>Math.ceil(public_repos / 100)),
    switchMap(max=>interval(1)
    .pipe(
      takeWhile(i=>++i<=max)
      mergeMap(i=>this.http.get<RepositoryI[]>(`https://api.github.com/users/${user}/repos?page=${++i}&per_page=100`))
    )),
    scan((acc, repo)=>[...acc, ...repo], [] as RepositoryI[])
  );

Here's the strategy I used:

  1. Get the page limit (using object destructuring) from getUserData() like what you already have.
  2. Use interval() to start a counting observable (starts with 0 hence the use of ++ later on).
  3. Use takeWhile() to check if the counter has exceeded the page limit, otherwise the observable completes here.
  4. Use mergeMap() to queue a series of API calls for each number emitted from interval().
  5. Reduce all the responses into a single array response with scan().

Update If you test this method, you'll need to update your component logic.

Because scan() reduces all API requests to a single observable, you can assign these values to your state array instead of pushing them.

This observable will emit immediately after the first API request. As each additional API request is completed, the observable will emit a new array with the added items. This is done so your component doesn't have to wait until all API requests are complete before it can display some data.

Upvotes: 0

Amer
Amer

Reputation: 6716

You can achieve that by one subscription using RxJS operators and functions, like the following:

public getUserRepos(user: string): Observable<RepositoryI[]> {
  // will return one observable that contains all the repos after concating them to each other:
  return this.getUserData(user).pipe(
    switchMap((data: UserI) => {
      //public_repos is the amt of repos the user has
      const pages: number = Math.ceil(data.public_repos / 100);

      // forkJoin will emit the result as (RepositoryI[][]) once all the sub-observables are completed:
      return forkJoin(
        Array.from(new Array(pages)).map((_, page) =>
          this.http.get<RepositoryI[]>(
            `https://api.github.com/users/${user}/repos?page=${page + 1}&per_page=100`
          )
        )
      ).pipe(
        // will reduce the RepositoryI[][] to be RepositoryI[] after concating them to each other:
        map((res) =>
          res.reduce((acc, value) => {
            return acc.concat(value);
          }, [])
        )
      );
    })
  );
}

Then in your component, you can subscribe to the observable that will return all the repos after fetching all of them:

  this.userService.getUserRepos(id).subscribe((repos) => {
    this.repositories = repos;
    // You can trigger the function that you need here...
  });

Upvotes: 4

Aviad P.
Aviad P.

Reputation: 32670

Summary

I will describe two ways to achieve this, one is imperative (using a loop) and the other reactive (using rxjs)

Imperative

You could do a simple loop to get all the pages (this one returns a promise, but if needed you can convert that to an observable using from):

public getUserRepos(user: string): Promise<Repository[]> {
  let pageNumber = 0;
  let results = [];

  const getPage = pageNumber =>
    this.http
      .get<Repository[]>(`https://api.github.com/users/${user}/repos?page=${pageNumber}&per_page=100`)
      .toPromise();

  do {
    const pageResults = await getPage(pageNumber++);
    results = results.concat(pageResults);
  } while(pageResults.length !== 0);

  return results;
}

Reactive

You could use rxjs's expand operator to query a page, then the next page, etc.

The usage requires that you map the http result to an object that also includes the page number, that way expand can know which page to get next.

Example in your case:

public getUserRepos(user: string): Observable<Repository[]> {
  const getPage = pageNumber =>
    this.http
      .get<Repository[]>(`https://api.github.com/users/${user}/repos?page=${pageNumber}&per_page=100`)
      .pipe(map(z => ({ results: z, page: pageNumber }))); // Add page number to results

  return getPage(0).pipe(
    expand(pr => getPage(pr.page + 1)),
    takeWhile(pr => pr.results.length !== 0),
    map(pr => pr.results),
    scan((acc, v) => [...acc, ...v])
  );
}

The result would be a query for page 0, then when it resolves, a query for page 1, etc. when a page results in 0 entries, the observable will complete.

I used scan to provide partial results as the pages are fetched, that way you can see something happening, instead of waiting for the all the pages to resolve before showing anything.

Example StackBlitz

Note: There will be an extra query for a page one after the last which will be cancelled, it is harmless, but if you want to avoid that, change your expand call to this:

expand(pr => (pr.results.length !== 0 ? getPage(pr.page + 1) : EMPTY))

Upvotes: 0

Piyush Jain
Piyush Jain

Reputation: 147

Javascript runs things asynchronously.

Your 'For loop' will run api in parallel and increment the count at the same time.

As you are making 3 parallel api calls those may return in different duration.

But before that your 'i' value will reach 3.

So in your case possibility is your 3rd api call which has less records (46 records) is returning before the first 2 api calls and is going to the subscribe logic first and you are seeing 46 records only.

              subscriber.next(data); // 3rd api comes here 
              
              if(pages == i) subscriber.complete();  // now your pages and i values are same and it gets completed.
            });

SOLUTION


import {defer } from 'rxjs';

defer(async () => {
     for (let i = 1; i <= pages; i++) {
          await this.http
            .get(
              `https://api.github.com/users/${user}/repos?page=${i}&per_page=100`
            )
            .toPromise().then((data: RepositoryI[]) => {
              subscriber.next(data);
            });
        }
}.subscribe(x => { subscriber.complete(); });

Upvotes: 0

vsnikhilvs
vsnikhilvs

Reputation: 576

          let subs = interval(1000).subscribe(() => {
          this.http
            .get(
              `https://api.github.com/users/${user}/repos?page=${i}&per_page=100`
            )
            .subscribe((data: RepositoryI[]) => {
              subscriber.next(data);
              if(!data) { // to check if data is null
                subs = null;  // if null, cancel the interval subscription
                return;  // and return (not continue the subscription)
              }
            });
          })
        }

This way you store the subscription and after checking if there is data, you unsubscribe(stop).

Please let me know if this works.

Upvotes: 0

Related Questions