PiwiTheKiwi
PiwiTheKiwi

Reputation: 129

Use RxJS pipe() to turn array into stream of asynchronous values in Angular

type Movie = {id: string};
type FullMovie = {id: string, picture: string};

I have a url that returns an array of type Movie:

http.get(url).subscribe(res: Movie[])

I use http.get(movie.id) for each movie in the array returning a FullMovie:

http.get(movie.id).subscribe(res: FullMovie)

so in essence I want to create a method that returns a stream of FullMovie objects, as the requests resolve: getAll = (url): Observable<FullMovie>

getAll = (url): Observable<FullMovie> => {
  return http.get(url)
    //must pipe the array into a stream of FullMovies but not a stream of FullMovie Observables. I don't want to subscribe to each of the returned FullMovies
    //something like
   .pipe(//map(array => array.forEach(movie => return http.get(movie.id))))
}

At the moment I have the following solution that works but I want to a more concise solution:

 private getFull = (queryGroup: string): Observable<TMDBMovie> =>
    new Observable<TMDBMovie>((observer) => {
      //get movie array
      this.httpGet(queryGroup).subscribe((movies) => {
        var j = 0;

        if (movies.length === 0) return observer.complete();

        //loop through elements
        movies.forEach(movie => {
          this.getById(movie.id).subscribe(
            (res) => complete(observer.next(res)),
            (error) => complete()
          );
        });
          
        }

        const complete = (arg: any = 0) => {
          if (++j === len) observer.complete();
        };
      });
    });

EDIT:

This works

newGetFull = (queryGroup: string) =>
    this.httpGet(queryGroup)
      .pipe(concatMap((arr) => from(arr)))
      .pipe(
        mergeMap((movie) => this.getById(movie.id).pipe(catchError(() => of())))
      );

Upvotes: 1

Views: 6343

Answers (1)

Picci
Picci

Reputation: 17762

You may want to try something along these lines

getAll = (url): Observable<FullMovie> => {
  return http.get(url)
   .pipe(
      // turn the array Movie[] into a stream of Movie, i.e. an Obsevable<Movie>
      concatMap(arrayOfMovies => from(arrayOfMovies)),
      // then use mergeMap to "flatten" the various Obaservable<FullMovie> that you get calling http.get(movie.id)
      // in other words, with mergeMap, you turn a stream of Observables into a stream of the results returned when each Observable is resolved
      mergeMap(movie => http.get(movie.id))
   )
}

Consider that using mergeMap as above you do not have guarantee that the final stream will have the same order as the array of Movies you get from the first call. This is because each http.get(movie.id) can take different time to return and therefore the order is not guaranteed.

If you need to guarantee the order, use concatMap rather than mergeMap (actually concatMap is mergeMap with concurrency set to 1).

If you want all the http.get(movie.id) to complete before returning the result, then use forkJoin rather than mergeMap like this

getAll = (url): Observable<FullMovie> => {
  return http.get(url)
   .pipe(
      // turn the array Movie[] into an array of Observable<Movie>
      map(arrayOfMovies => arrayOfMovies.map(movie => http.get(movie.id))),
      // then use forkJoin to resolve all the Observables in parallel
      concatMap(arrayOfObservables => forkJoin(arrayOfObservables))
   ).subscribe(
      arrayOfFullMovies => {
        // the result notified by forkJoin is an array of FullMovie objects
      }
   )
}

Upvotes: 3

Related Questions