skyleguy
skyleguy

Reputation: 1151

use RXJS to keep triggering http calls in angular until a condition is met

I am making calls to the spotify api which returns an object like so:

{
  next: 'https://api.spotify.com/v1/me/tracks?offset=100&limit=50'
  items: [...]
}

where items are the result of the just-made network call and next is the url to use to get the next 'page' of results. What i would like to do is make an initial call with no offset, and then keep making network calls until next is null, meaning the user has no more items left to get. If there are no more items they send back null for next.

This seems possible but cant seem to figure out how to do it correctly. What I am looking for is something like this:

  readonly baseSpotifyUrl: string = 'https://api.spotify.com/v1';

constructor(private http: HttpClient) {}


public getTracks(url?: string): Observable<any> {
    return this.http.get(url && url?.length > 0 ? url : `${this.baseSpotifyUrl}/me/tracks?limit=50`);
  }

public getAllTracks(): Observable<any> {
    const tracks$: Subject<any> = new Subject();
    let next: string = '';
    this.getTracks(next)
      .subscribe({
        next: (res: any): void => {
          tracks$.next(res.items);
          next = res.next;
          // if res.next !== null, do this again now that we have set next to res.next
        },
        error: (err: any): void => {
          console.error(err);
        }
      });
    return tracks$;
  }

The idea here is that my component will call getAllTracks() and receive a subject and then new items will be continuously pushed through that subject until all the items have been retrieved. I cannot seem to figure out how to make a new network request when the previous one returns ONLY IF there are more items to get (res.next !== null)

EDIT-----------------------------------------------------------

This gets the job done but I feel that its trash:

  public getAllTracksSegment(itemsSubject: Subject<any>, nextSubject: Subject<string>, url?: string): void {
    this.http.get(url && url?.length > 0 ? url : `${this.baseSpotifyUrl}/me/tracks?limit=50`).subscribe({
      next: (res: any): void => {
        itemsSubject.next(res.items);
        nextSubject.next(res.next);
      }
    });
  }

  public getAllTracks(): Observable<any> {
    const tracks$: Subject<any> = new Subject();
    const next$: Subject<string> = new Subject();
    next$.subscribe({
      next: (next: any): void => {
        if (next !== null) {
          this.getAllTracksSegment(tracks$, next$, next);
        }
      }
    });
    next$.next('');
    return tracks$;
  }

Upvotes: 5

Views: 5023

Answers (4)

Jorge Mussato
Jorge Mussato

Reputation: 2514

I made up this solution using RxJs. I hope it helps, give me a feedback if it works.

expand do recursive searchs and takeWhile control to requests only when Next isn't null. Reduce group all your tracks in an array. When you subscribe you have all the tracks available.

  getAllTracks(): Observable<any> {
    return this.getAllTracksSegment().pipe(
      expand((res: any) => this.getAllTracksSegment(res.next)),
      takeWhile((res) => !!res.next),
      reduce((total, current) => total.concat(current.tracks), [])
    ); // All tracks available when you subscribe
  }

  getAllTracksSegment(url?: string): Observable<any> {
    return this.http.get(url && url?.length > 0 ? url : `${this.baseSpotifyUrl}/me/tracks?limit=50`);
  }

PS: If you wanna emit partial results, just remove the reduce operator and it will work

Upvotes: 0

PixelBucket
PixelBucket

Reputation: 11

While I'll love @picci's approved answer, I wanted to present a slightly adjusted version. This highlights that there are usually multiple ways to achieve the same thing with RxJS :).

public getTracks(url?: string): Observable<any> {
    return this.http
      .get(url?.length ? url : `${this.baseSpotifyUrl}/me/tracks?limit=50`)
      .pipe(
        takeWhile((data: any) => data.next !== null), // Crucial, to stop the recursive call when `next` is null
      );
  }

  public getAllTracks(): Observable<any[]> {
    // the first call is with no parameter so that the default url with no offset is used
    return getTracks().pipe(
      // expand is used to call recursively getTracks until next is null
      expand(data => this.getTracks(data.next)),
      // with tap you can see the result returned by each call
      tap(data => console.log(data)),
      // if you want you can use the reduce operator to eventually emit the
      // accumulated array with all items
      reduce((acc, val) => [...acc, ...val.items], []),
    );
  }

// now you can fire the execution of the recursive calls by subscribing
// to the observable returned by getAllTracks
this.getAllTracks().subscribe(
   // allItems is an array containing all the items returned by the various calls
   allItems => console.log(allItems)
)

The key is takeWhile. It will complete once the condition has fulfilled. Both answers work great, though :).

Here the adapted example https://stackblitz.com/edit/rxjs-tcbdue?file=index.ts

Upvotes: 0

Picci
Picci

Reputation: 17752

If I understand the issue right, I would use the expand operator to build a solution.

Here the code I would use. Comments are inline

public getTracks(url?: string): Observable<any> {
    return this.http.get(url && url?.length > 0 ? url : `${this.baseSpotifyUrl}/me/tracks?limit=50`);
}

public getAllTracks(): Observable<any[]> {
  // the first call is with no parameter so that the default url with no offset is used
  return getTracks().pipe(
     // expand is used to call recursively getTracks until next is null
     expand(data => data.next === null ? EMPTY : getTracks(data.next)),
     // with tap you can see the result returned by each call
     tap(data => console.log(data)),
     // if you want you can use the reduce operator to eventually emit the 
     // accumulated array with all items
     reduce((acc, val) => {
       acc = [...acc, ...val.items]
       return acc
     }, [])
  )
}

// now you can fire the execution of the recursive calls by subscribing
// to the observable returned by getAllTracks
getAllTracks().subscribe(
   // allItems is an array containing all the items returned by the various calls
   allItems => console.log(allItems)
)

ADDITIONAL EXPLANATIONS after the comments of @skyleguy

The tap operator is used to implement side effects. In other words it receives all the notifications from the upstream, does whatever it needs to do with the data notified and then passes downstream the same notification. There is no need to return anything from the function passed to the tap operator. The upstream is just passed downstream after the side effect is applied. In this example the side effect is simply the print on the console of the data passed with the notification.

The reduce used within the pipe is the reduce operator of RxJs and not the reduce method of Array. The reduce RxJs operator accumulates all the data notified from upstream and emits only one value when upstream completes. So, in this example, every time the call to the remote function returns something, this something enters the reduce operator and contributes to the accumulation logic. When expand returns the EMPTY Observable, at the end of the recursion, the EMPTY Observable just completes without notifying anything, which means that upstream completes and therefore reduce can emit its first and only notification, i.e. the array with all items accumulated, and then complete.

This stackblitz replicate this logic with a simulation of the remote call.

Upvotes: 6

Joshua McCarthy
Joshua McCarthy

Reputation: 1842

While other answers mention takeWhile(), I would like to use it with a more simple/declarative solution. One more thing, while I used Observable<any[]> (I'm assuming items is an array), I would strongly advise adding a type declaration for the object being returned from the API.

public getAllTracks$: Observable<any[]>;
private nextUrl = new BehaviorSubject<string>('https://api.spotify.com/v1/me/tracks?limit=50');


constructor(private http: HttpClient) {
    getAllTracks$ = this.nextUrl.pipe(
        mergeMap(url => this.http.get(url)),
        takeWhile(res => !!res.next),
        tap(res => this.nextUrl.next(res.next)),
        scan((items, res) => [...items, ...res.items], [] as any[])
    );
}

First we define a BehaviorSubject that will emit what URL to call for each request, starting with the initial value. Then we define our observable in the constructor. No additional properties or methods needed.

getAllTracks$ subscribes to the BehaviorSubject and immediately receives the first url (the string we included in its declaration).

We use mergeMap() so it will call all sequential API requests. (If we used switchMap(), any new URL received from our BehaviorSubject would cancel any in progress API call and would "switch" to the new URL value.) I also find this more straigtforward than using expand().

Then our takeWhile() checks if our response has a next property. If it does, we continue to the last two operators. If it doesn't, it emits a complete signal, causing all subscribers to unsub.

tap() is best used for external state updates. Because we have our next URL, we can send it to our BehaviorSubject, queueing the next API call.

Lastly, we use scan() to combine how this data is presented. The scan() operator is like reduce() but for observable emissions. On the first emit it declares a new array, and adds any subsequent API emits to its list.

Upvotes: 0

Related Questions