Jordan Lewallen
Jordan Lewallen

Reputation: 1851

Combine multiple observables in to a single RxJS stream

It appears I am lacking knowledge on which RxJS operator to resolve the following problem:

In my music application, I have a submission page (this is like a music album). To load the submission, I use the following query:

this.submissionId = parseInt(params['album']);

if (this.submissionId) {
  this.submissionGQL.watch({
    id: this.submissionId
  }).valueChanges.subscribe((submission) => {
      //submission loaded here!
  });
}

Easy enough! However, once I've loaded the submission, I have to load some auxiliary information such as the current user (to check if they are the artist of the submission) and comments. In order to avoid nested subscriptions, I can modify the above query to use switchMap to switch the query stream to user and comments observables once the submission resolves:

// stream to query for the submission and then switch query to user
this.submissionGQL.watch({
    id: this.submissionId
  }).valueChanges.pipe(
  switchMap(submission => {
    this.submission = submission;
    return this.auth.user$
  })
).subscribe((user) => {
  // needs value of submission here
  if (user.id == this.submission.user.id) {
    //user is owner of submission
  }
})

// stream to query for the submission and then switch query to comments
this.submissionGQL.watch({
    id: this.submissionId
  }).valueChanges.pipe(
  switchMap(submission => {
    this.comments$ = this.commentsGQL.watch({
      submissionId: submission.id //needs submission response here
    })
    return this.comments$.valueChanges
  })
).subscribe((comments) => {
  this.comments = comments;
})

Great! I've avoided the nested subscription issue BUT now...the first part of each submission request is identical. Basically, once, the submission is queried, i want to launch off two parallel queries:

Which RxJS operator can perform such an operation? I suppose the subscribe at the end would emit an array response like:

.subscribe([user, comments] => {
    // check if user == submission.user.id here
    // also assign comments to component variable here
})

I believe mergeMap is sort of what I need but I'm not sure how to implement that properly. Or is this a case where I should share() the submission query and then build off my parallel queries separately? I'm very curious! Please let me know, thanks!

Upvotes: 0

Views: 3361

Answers (2)

wentjun
wentjun

Reputation: 42576

You can use the RxJS forkJoin operator for this scenario. As stated on the documentation,

When all observables complete, emit the last emitted value from each.

const userQuery$ = this.submissionGQL.watch({
    id: this.submissionId
  }).valueChanges.pipe(
  switchMap(submission => {
    this.submission = submission;
    return this.auth.user$
  })
)

// stream to query for the submission and then switch query to comments
const commentsQuery$ = this.submissionGQL.watch({
    id: this.submissionId
  }).valueChanges.pipe(
  switchMap(submission => {
    this.comments$ = this.commentsGQL.watch({
      submissionId: submission.id //needs submission response here
    })
    return this.comments$.valueChanges
  })
)

forkJoin(userQuery$, commentsQuery$).subscribe([user, comments] => {
  // check if user == submission.user.id here
  // also assign comments to component variable here
})

Upvotes: 1

AliF50
AliF50

Reputation: 18889

Try:

  this.submissionGQL.watch({
    id: this.submissionId
  }).valueChanges.pipe(
   switchMap(submission => {
     this.submission = submission;
     const user$ = this.auth.user$;
     this.comments$ = this.commentsGQL.watch({
       submissionId: submission.id 
     });
     return combineLatest(user$, this.comments$);
   }),
   // maybe put a takeUntil to remove subscription and not cause memory leaks
  ).subscribe(([user, comments]) => {
    // check if user == submission.user.id here
    // also assign comments to component variable here
 });

Something you should consider is eliminating instance variables with the help of the async pipe given by Angular (https://malcoded.com/posts/angular-async-pipe/). It will subscribe to the observable, present it into the view and automatically unsubscribe when the view is destroyed.

So, using that, we can get rid of this.submissions = submission by putting:

submissions$: Observable<ISubmission>; // assuming there is an interface of ISubmission, if not put any

// then when this.submissionId is defined
this.submissions$ = this.submissionGQL.watch({
  id: this.submissionId
}).valueChanges;

// then when using it in your view you can do {{ this.submissions$ | async }}

The same thing can go for this.comments$. All of this is optional though. I try to minimize instance variables as much as possible when using RxJS because too many instance variables leads to confusion.

Then you can lead off of this.submissions$ observable and subscribe for the other main stream.

 this.submission$.pipe(
  switchMap(submission => ..... // everything else being the same
 )

I chose the combineLatest operator but you can use zip and forkJoin as you see fit. They all have subtle differences (https://scotch.io/tutorials/rxjs-operators-for-dummies-forkjoin-zip-combinelatest-withlatestfrom).

Upvotes: 0

Related Questions