Reputation: 1851
It appears I am lacking knowledge on which RxJS operator to resolve the following problem:
In my music application, I have a submission page (this is like a music album). To load the submission, I use the following query:
this.submissionId = parseInt(params['album']);
if (this.submissionId) {
this.submissionGQL.watch({
id: this.submissionId
}).valueChanges.subscribe((submission) => {
//submission loaded here!
});
}
Easy enough! However, once I've loaded the submission
, I have to load some auxiliary information such as the current user
(to check if they are the artist of the submission) and comments
. In order to avoid nested subscriptions, I can modify the above query to use switchMap
to switch the query stream to user
and comments
observables once the submission
resolves:
// stream to query for the submission and then switch query to user
this.submissionGQL.watch({
id: this.submissionId
}).valueChanges.pipe(
switchMap(submission => {
this.submission = submission;
return this.auth.user$
})
).subscribe((user) => {
// needs value of submission here
if (user.id == this.submission.user.id) {
//user is owner of submission
}
})
// stream to query for the submission and then switch query to comments
this.submissionGQL.watch({
id: this.submissionId
}).valueChanges.pipe(
switchMap(submission => {
this.comments$ = this.commentsGQL.watch({
submissionId: submission.id //needs submission response here
})
return this.comments$.valueChanges
})
).subscribe((comments) => {
this.comments = comments;
})
Great! I've avoided the nested subscription issue BUT now...the first part of each submission
request is identical. Basically, once, the submission
is queried, i want to launch off two parallel queries:
Which RxJS operator can perform such an operation? I suppose the subscribe at the end would emit an array response like:
.subscribe([user, comments] => {
// check if user == submission.user.id here
// also assign comments to component variable here
})
I believe mergeMap
is sort of what I need but I'm not sure how to implement that properly. Or is this a case where I should share()
the submission query and then build off my parallel queries separately? I'm very curious! Please let me know, thanks!
Upvotes: 0
Views: 3361
Reputation: 42576
You can use the RxJS forkJoin operator for this scenario. As stated on the documentation,
When all observables complete, emit the last emitted value from each.
const userQuery$ = this.submissionGQL.watch({
id: this.submissionId
}).valueChanges.pipe(
switchMap(submission => {
this.submission = submission;
return this.auth.user$
})
)
// stream to query for the submission and then switch query to comments
const commentsQuery$ = this.submissionGQL.watch({
id: this.submissionId
}).valueChanges.pipe(
switchMap(submission => {
this.comments$ = this.commentsGQL.watch({
submissionId: submission.id //needs submission response here
})
return this.comments$.valueChanges
})
)
forkJoin(userQuery$, commentsQuery$).subscribe([user, comments] => {
// check if user == submission.user.id here
// also assign comments to component variable here
})
Upvotes: 1
Reputation: 18889
Try:
this.submissionGQL.watch({
id: this.submissionId
}).valueChanges.pipe(
switchMap(submission => {
this.submission = submission;
const user$ = this.auth.user$;
this.comments$ = this.commentsGQL.watch({
submissionId: submission.id
});
return combineLatest(user$, this.comments$);
}),
// maybe put a takeUntil to remove subscription and not cause memory leaks
).subscribe(([user, comments]) => {
// check if user == submission.user.id here
// also assign comments to component variable here
});
Something you should consider is eliminating instance variables with the help of the async
pipe given by Angular (https://malcoded.com/posts/angular-async-pipe/).
It will subscribe to the observable, present it into the view and automatically unsubscribe when the view is destroyed.
So, using that, we can get rid of this.submissions = submission
by putting:
submissions$: Observable<ISubmission>; // assuming there is an interface of ISubmission, if not put any
// then when this.submissionId is defined
this.submissions$ = this.submissionGQL.watch({
id: this.submissionId
}).valueChanges;
// then when using it in your view you can do {{ this.submissions$ | async }}
The same thing can go for this.comments$
. All of this is optional though. I try to minimize instance variables as much as possible when using RxJS because too many instance variables leads to confusion.
Then you can lead off of this.submissions$
observable and subscribe for the other main stream.
this.submission$.pipe(
switchMap(submission => ..... // everything else being the same
)
I chose the combineLatest
operator but you can use zip
and forkJoin
as you see fit. They all have subtle differences (https://scotch.io/tutorials/rxjs-operators-for-dummies-forkjoin-zip-combinelatest-withlatestfrom).
Upvotes: 0