Reputation: 133
I'm a rookie in rxjs, and I'm trying to figure out what happened in the following code.
Indeed, in order to illustrate the idea, I prepare two simple promises (resolvePromise and rejectPromise), one resolves a promise, and the other rejects. A "Subject" instance is used to trigger these two promises (piped with mergeMap and forkJoin). At the end, two separates instructions (source$.next('xxx')) are added, hoping to trigger the subscription twice.
But finally, only the first "source$.next('1')" triggers the subscription, and the next "source$.next('2')" seems to do nothing.
I guess this behavior is caused by the reject promise, which literally "throws an exception". But I'm wondering how can I fix this code, so that the second "source$.next('2')" will also trigger the subscription.
Thanks a lot
import { of,forkJoin,Subject } from 'rxjs';
import { mergeMap } from 'rxjs/operators';
const resolvePromise = val =>
new Promise(resolve => resolve(`resolve value: ${val}`));
const rejectPromise = err =>
Promise.reject(`reject error: ${err}`);
const source$ = new Subject();
source$
.pipe(mergeMap(val => {
return forkJoin([
resolvePromise(val),
rejectPromise(val)
]);
}))
.subscribe(
console.log,
console.error
);
source$.next('1');
source$.next('2');
Based on suggestion given by Andrei Gătej below, in order to overcome this problem, I chose to pipe a catchError after the forkJoin, and place the error handling inside.
source$.pipe(
mergeMap(val => {
return forkJoin([
resolvePromise(val),
rejectPromise(val)
])
.pipe(
catchError(err => {
// your error handling business logic
console.error(err);
return empty();
})
);
}))
.subscribe(console.log);
Upvotes: 0
Views: 443
Reputation: 13539
you need to resubscribe via retry
or repeat
.
import { of,forkJoin,Subject } from 'rxjs';
import { mergeMap } from 'rxjs/operators';
const resolvePromise = val =>
new Promise(resolve => resolve(`resolve value: ${val}`));
const rejectPromise = err =>
Promise.reject(`reject error: ${err}`);
const source$ = new Subject();
source$.pipe(
mergeMap(val => forkJoin([
resolvePromise(val),
rejectPromise(val)
])),
catchError(() => EMPTY),
repeat(),
).subscribe(
console.log,
console.error
);
source$.next('1');
source$.next('2');
Upvotes: 0
Reputation: 11924
reject promise, which literally "throws an exception"
I'd say you're right, judging by how RxJS handles promises internally:
promise.then(
(value) => {
if (!subscriber.closed) {
subscriber.next(value);
subscriber.complete();
}
},
(err: any) => subscriber.error(err) // !
)
What's also worth mentioning is that, as you can see from the snippet above, as soon as the promise resolves for the first time, that resolved value and a complete
notification will be passed along.
The complete
part is very important for the forkJoin
operator.
forkJoin
subscribes to all the provided observables and waits until all complete. It will send an array with values only if the observables would have emitted at least once.
When one observable emits an error
notification, forkJoin
will immediately send that error
notification further in the chain.
Here's my approach:
ource$
.pipe(mergeMap(val => {
return forkJoin([
resolvePromise(val),
from(rejectPromise(val)).pipe(catchError(err => of(err)))
]);
}))
.subscribe(
console.log,
console.error
);
We're using from
so that we can intercept that promise error early, which allows us to use catchError
and use an observable that will send a value and then a complete
notification(of(err)
).
Upvotes: 1