Reputation: 4163
I have two observables which I want to merge within the forkJoin
method from rxjs
. Executing the observables independently works, but using forkJoin
it does not reach the pipe finalize/subscribe
method.
My.component.ts
....
const req1 = this.userService.getUser(this.loggedInUser.userId);
const req2 = this.boardGames$;
this.subscriptions$ = forkJoin([req1, req2])
.pipe(
finalize(() => {
console.log('pipe'); // Is not reached
})
)
.subscribe(([obj1, obj2]) => {
console.log('subscribe'); // Is not reached
}, err => console.log(err), ()=>console.log('compl'));
req1.subscribe((aa) => console.log(aa)); // This is logged
req2.subscribe((bb) => console.log(bb)); // This is logged
....
I am using Angularfire2
for requests. I am not sure if this can be an issue, because independently the subscriptions work.
import { AngularFirestore } from 'angularfire2/firestore';
What is it that I am missing here?
Upvotes: 1
Views: 993
Reputation: 142
forkjoin()
needs both your subscriptions to complete in order to actually join them. So if either of your subscriptions don't complete then forkjoin()
will never be reached. If you are using firebase your observables don't complete.
If your subscriptions don't complete and you need a stream from both observables then you should try out combineLatest()
instead. This takes in two active subscriptions and once each subscription emits a value will join the values in a single subscription and continue to emit values until completed.
Here is a link for combineLatest
If you only need to check is the user is valid before making a call to firebase then try switchMap()
. This will switch your user observable into your boardgame observable and you will only be dealing with the boardgame observable.
Upvotes: 0
Reputation: 1026
forkJoin
emits only when all observables completes. I can't see the rest of your code (for example what is boardGames$
observable). Most likely you are using observables that won't complete after first emission, which is intended behavior of AngularFirestore
since most commonly you subscribe to changes in the database (Firebase).
Use combineLatest
if you need to get latest values when some observable emits. Keep in mind that it will start to emit only when each of the source observables emitted.
combineLatest([
this.userService.getUser(this.loggedInUser.userId),
this.boardGames$
]).subscribe(([user, boardGames]) => {
// Don't forget to unsubscribe
});
Use merge
ir you want to merge observables into one observable. It would work in your current case. Like this:
merge(
this.userService.getUser(this.loggedInUser.userId).pipe(map(entity => ({entity, type: 'user'}))),
this.boardGames$.pipe(map(entity => ({entity, type: 'boardGames'})))
).subscribe(({entity, type}) => {
// Don't forget to unsubscribe
})
With forkJoin
you can achieve it like this:
const req1 = this.userService.getUser(this.loggedInUser.userId).pipe(take(1));
const req2 = this.boardGames$.pipe(take(1));
this.subscriptions$ = forkJoin([req1, req2]).subscribe(() => {
// I will complete after both observables emits.
});
Note that you would still need to handle subscription even if use take(1)
since in case if some observable never emits and component is destroyed, you would have a memory leak. There is awesome library for handling subscriptions without boilerplate.
Upvotes: 3