Reputation: 13933
I'm looking for the equivalent of promise.then(onNextCallback,onErrorCallback)
in rxjs.
is there something like this?
pipe(concatMap(),catchError)
alternatives is not what I'm looking for.Upvotes: 3
Views: 1486
Reputation: 2741
I also asked for this as well and ended up writing my own.
import * as RxJS from 'rxjs';
/**
* Like `promise.then(onFulfilled, onRejected)`. This is *not* the same as
* `map(onNext).catchError(onError)`, because this operator will only catch errors resulting from
* the original observable—not errors resulting from `onNext`.
*/
export const mapOrCatchError = <T, B>(
onNext: (value: T) => B,
onError: (value: unknown) => B,
): RxJS.OperatorFunction<T, B> => ob$ =>
new RxJS.Observable<B>(observer =>
ob$.subscribe({
next: t => {
let next: B;
try {
next = onNext(t);
} catch (error) {
observer.error(error);
return;
}
observer.next(next);
},
error: error => {
let next: B;
try {
next = onError(error);
} catch (newError) {
observer.error(newError);
return;
}
observer.next(next);
observer.complete();
},
complete: () => {
observer.complete();
},
}),
);
Tests:
import { marbles } from 'rxjs-marbles/jest';
import { mapOrCatchError } from '../operators';
describe('mapOrCatchError', () => {
it(
'should map',
marbles(m => {
const source$ = m.cold('--(a|)', { a: 1 });
const expected = ' --(b|)';
const actual$ = source$.pipe(
mapOrCatchError(
a => a + 1,
_error => 0,
),
);
m.expect(actual$).toBeObservable(expected, { b: 2 });
}),
);
it(
'should catch',
marbles(m => {
const source$ = m.cold('--#');
const expected = ' --(a|)';
const actual$ = source$.pipe(
mapOrCatchError(
a => a + 1,
_error => 0,
),
);
m.expect(actual$).toBeObservable(expected, { a: 0 });
}),
);
it(
'should error if error handler throws',
marbles(m => {
const source$ = m.cold('--#');
const expected = ' --#';
const error = new Error('foo');
const actual$ = source$.pipe(
mapOrCatchError(
a => a + 1,
_error => {
throw error;
},
),
);
m.expect(actual$).toBeObservable(expected, undefined, error);
}),
);
it(
'should not catch errors thrown by map function',
marbles(m => {
const source$ = m.cold('--(a|)');
const expected = ' --#';
const error = new Error('foo');
const actual$ = source$.pipe(
mapOrCatchError(
() => {
throw error;
},
_error => 'caught error',
),
);
m.expect(actual$).toBeObservable(expected, undefined, error);
}),
);
});
Upvotes: 2
Reputation: 2344
Indeed it is not of my knowledge any operator that would do what you need.
What I do suggest, though, it's to create your own.
function promiseLike<T>(
onNext: (data: T) => Observable<T>,
onError: (err: any) => Observable<any>
) {
type ErrorWrapper = {isError: boolean, err: any};
const isErrorWrapper = (err: any): err is ErrorWrapper => {
return err.isError && err.err !== undefined;
}
return function(source: Observable<T>): Observable<T> {
return source.pipe(
catchError((err) => of({isError: true, err})),
switchMap((data) => isErrorWrapper(data) ? onError(data.err) : onNext(data))
);
}
}
The above basically wraps the error from source observable and then we switchMap
to decide if I run the onNext
and onError
. This way, for sure, the onError
won't catch possible errors coming from the onNext
.
Here are the example of usage:
function getResolvingPromise(): Promise<string> {
return Promise.resolve('SUCCESS');
}
function getErrorPromise(): Promise<string> {
return Promise.reject('onError');
}
// Example with success
from(getResolvingPromise()).pipe(
promiseLike(
(data) => of(`received ${data}`),
(err) => of(3)
)
).subscribe((d) => console.log('Ex1', d)) // Logs Ex1 received SUCCESS
// Example with error
from(getErrorPromise()).pipe(
promiseLike(
(data) => of(`received ${data}`),
(err) => of(3)
)
).subscribe((d) => console.log('Ex2', d)) // Logs Ex2 3
// Example with internal error 2
from(getResolvingPromise()).pipe(
promiseLike(
(data) => throwError('Valid token not returned'),
(err) => of(3)
),
catchError(() => of('catching internal error'))
).subscribe((d) => console.log('Ex3', d)) // Logs Ex3 catching internal error
Upvotes: 1
Reputation: 1124
Is .toPromise()
what are you looking for ?
https://www.learnrxjs.io/learn-rxjs/operators/utility/topromise
Then you can do your .then(nextCallback)
.catch(errorCallback)
Upvotes: 0
Reputation: 2947
What about something like this?
source$
is the source Observable and you can provide corresponding functions via tap operator inside the pipe
.
source$.pipe(
tap({ next: onNextCallback, error: onErrorCallback })
)
Upvotes: 0