user1202032
user1202032

Reputation: 1479

Typescript awaiting multiple subscriptions

I have an Observable containing a list of ID's. For each ID, i wish to get the object which the ID represents. Doing this requires me to get an observable for each object. How do I make sure all objects have been received before continuing?

This is a many-to-many relation in a database.

getExercises(programKey: string): Observable<Array<Exercise>> {
    let source = Observable.create(observer => {
      // ... stuff here ...
      programExercises.subscribe(programExercisesSnapshot => {
        let exercises = Array<Exercise>();
        programExercisesSnapshot.forEach(programExercise => {
          let exercise = this.getExercise(programExercise.ExerciseKey); // Returns Observable<Exercise>
          exercise.subscribe(exerciseSnapshot => exercises.push(exerciseSnapshot)); // TODO: Need to await all these subscriptions before calling observer.next()
        });
        observer.next(exercises);
      });

      return () => { }; // Dispose
    });

    return source;
}

Thanks in advance!

Upvotes: 1

Views: 1364

Answers (1)

Alexander Leonov
Alexander Leonov

Reputation: 4794

Well, apart from the fact that returning Array as a result of Observable looks a bit odd, this is how I'd do it:

getExercises(programKey: string): Observable<Array<Exercise>> {
    // ... stuff here ...
    return programExercises
        // assuming that programExercisesSnapshot is an array or can be easily converted to it
        .flatMap(programExercisesSnapshot => Observable
            .from(programExercisesSnapshot)
            .flatMap(programExercise => this.getExercise(programExercise.ExerciseKey))
            .bufferCount(programExercisesSnapshot.length));

Now let's see how this is supposed to work. Let's start with inner thing.

  1. we generate observable from array programExercisesSnapshot, which then emits its elements one by one;
  2. we catch those elements and replace them in the flow with the results of observables returned by this.getExercise(programExercise.ExerciseKey) calls using flatMap();
  3. bufferCount() gathers programExercisesSnapshot.length elements into a single array and emits it as a result.

So, this whole pipeline emits arrays of results of this.getExercise() calls.

Now, the outer thing does the following:

  1. it takes batches emitted by programExercises;
  2. replaces them with the results (e.g. arrays) emitted by previously described observable;
  3. and emits those results as its own.

Profit! :)

One more thing you missed in your original solution is a cleanup. When you do programExercises.subscribe() you also need to unsubscribe from it manually as well. Doing as I suggested eliminates the need of it - rxjs will take care about it for you.

Also, as I told in the beginning, returning an Array in the observable looks a little bit odd. I hope you have good reason for doing that. :) Otherwise you may want to consider converting that into observable emitting elements one by one as well.

UPDATED.

Since the author confessed :) that arrays are not needed here is another solution, more simple and elegant:

getExercises(programKey: string): Observable<Exercise> {
    // ... stuff here ...
    return programExercises
        // assuming that programExercisesSnapshot is an array or can be easily converted to it
        .flatMap(programExercisesSnapshot => Observable.from(programExercisesSnapshot))
        .flatMap(programExercise => this.getExercise(programExercise.ExerciseKey));

Upvotes: 2

Related Questions