Stav Alfi
Stav Alfi

Reputation: 13933

rxjs - operator with both on-next and on-error callbacks?

I'm looking for the equivalent of promise.then(onNextCallback,onErrorCallback) in rxjs.

is there something like this?

Upvotes: 3

Views: 1486

Answers (4)

Oliver Joseph Ash
Oliver Joseph Ash

Reputation: 2741

I also asked for this as well and ended up writing my own.

import * as RxJS from 'rxjs';

/**
 * Like `promise.then(onFulfilled, onRejected)`. This is *not* the same as
 * `map(onNext).catchError(onError)`, because this operator will only catch errors resulting from
 * the original observable—not errors resulting from `onNext`.
 */
export const mapOrCatchError = <T, B>(
  onNext: (value: T) => B,
  onError: (value: unknown) => B,
): RxJS.OperatorFunction<T, B> => ob$ =>
  new RxJS.Observable<B>(observer =>
    ob$.subscribe({
      next: t => {
        let next: B;
        try {
          next = onNext(t);
        } catch (error) {
          observer.error(error);
          return;
        }
        observer.next(next);
      },
      error: error => {
        let next: B;
        try {
          next = onError(error);
        } catch (newError) {
          observer.error(newError);
          return;
        }
        observer.next(next);
        observer.complete();
      },
      complete: () => {
        observer.complete();
      },
    }),
  );

Tests:

import { marbles } from 'rxjs-marbles/jest';
import { mapOrCatchError } from '../operators';

describe('mapOrCatchError', () => {
  it(
    'should map',
    marbles(m => {
      const source$ = m.cold('--(a|)', { a: 1 });
      const expected = '      --(b|)';

      const actual$ = source$.pipe(
        mapOrCatchError(
          a => a + 1,
          _error => 0,
        ),
      );
      m.expect(actual$).toBeObservable(expected, { b: 2 });
    }),
  );
  it(
    'should catch',
    marbles(m => {
      const source$ = m.cold('--#');
      const expected = '      --(a|)';

      const actual$ = source$.pipe(
        mapOrCatchError(
          a => a + 1,
          _error => 0,
        ),
      );
      m.expect(actual$).toBeObservable(expected, { a: 0 });
    }),
  );
  it(
    'should error if error handler throws',
    marbles(m => {
      const source$ = m.cold('--#');
      const expected = '      --#';

      const error = new Error('foo');
      const actual$ = source$.pipe(
        mapOrCatchError(
          a => a + 1,
          _error => {
            throw error;
          },
        ),
      );
      m.expect(actual$).toBeObservable(expected, undefined, error);
    }),
  );
  it(
    'should not catch errors thrown by map function',
    marbles(m => {
      const source$ = m.cold('--(a|)');
      const expected = '      --#';

      const error = new Error('foo');
      const actual$ = source$.pipe(
        mapOrCatchError(
          () => {
            throw error;
          },
          _error => 'caught error',
        ),
      );
      m.expect(actual$).toBeObservable(expected, undefined, error);
    }),
  );
});

Upvotes: 2

Guilhermevrs
Guilhermevrs

Reputation: 2344

Indeed it is not of my knowledge any operator that would do what you need.

What I do suggest, though, it's to create your own.

function promiseLike<T>(
  onNext: (data: T) => Observable<T>,
  onError: (err: any) => Observable<any>
) {

  type ErrorWrapper = {isError: boolean, err: any};
  const isErrorWrapper = (err: any): err is ErrorWrapper => {
    return err.isError && err.err !== undefined;
  }

  return function(source: Observable<T>): Observable<T> {
    return source.pipe(
      catchError((err) => of({isError: true, err})),
      switchMap((data) => isErrorWrapper(data) ? onError(data.err) : onNext(data))
    );
  }
}

The above basically wraps the error from source observable and then we switchMap to decide if I run the onNext and onError. This way, for sure, the onError won't catch possible errors coming from the onNext.

Here are the example of usage:

function getResolvingPromise(): Promise<string> {
  return Promise.resolve('SUCCESS');
}

function getErrorPromise(): Promise<string> {
  return Promise.reject('onError');
}

// Example with success
from(getResolvingPromise()).pipe(
  promiseLike(
    (data) => of(`received ${data}`),
    (err) => of(3)
  )
).subscribe((d) => console.log('Ex1', d)) // Logs Ex1 received SUCCESS

// Example with error
from(getErrorPromise()).pipe(
  promiseLike(
    (data) => of(`received ${data}`),
    (err) => of(3)
  )
).subscribe((d) => console.log('Ex2', d)) // Logs Ex2 3

// Example with internal error 2
from(getResolvingPromise()).pipe(
  promiseLike(
    (data) => throwError('Valid token not returned'),
    (err) => of(3)
  ),
  catchError(() => of('catching internal error'))
).subscribe((d) => console.log('Ex3', d)) // Logs Ex3 catching internal error

Upvotes: 1

Tawfik Nasser
Tawfik Nasser

Reputation: 1124

Is .toPromise() what are you looking for ?

https://www.learnrxjs.io/learn-rxjs/operators/utility/topromise

Then you can do your .then(nextCallback) .catch(errorCallback)

Upvotes: 0

Goga Koreli
Goga Koreli

Reputation: 2947

What about something like this?

source$ is the source Observable and you can provide corresponding functions via tap operator inside the pipe.

source$.pipe(
  tap({ next: onNextCallback, error: onErrorCallback })
)

Upvotes: 0

Related Questions