Yuan
Yuan

Reputation: 133

Subscription triggered only once after error

I'm a rookie in rxjs, and I'm trying to figure out what happened in the following code.

Indeed, in order to illustrate the idea, I prepare two simple promises (resolvePromise and rejectPromise), one resolves a promise, and the other rejects. A "Subject" instance is used to trigger these two promises (piped with mergeMap and forkJoin). At the end, two separates instructions (source$.next('xxx')) are added, hoping to trigger the subscription twice.

But finally, only the first "source$.next('1')" triggers the subscription, and the next "source$.next('2')" seems to do nothing.

I guess this behavior is caused by the reject promise, which literally "throws an exception". But I'm wondering how can I fix this code, so that the second "source$.next('2')" will also trigger the subscription.

Thanks a lot

import { of,forkJoin,Subject } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

const resolvePromise = val =>
  new Promise(resolve => resolve(`resolve value: ${val}`));

const rejectPromise = err =>
  Promise.reject(`reject error: ${err}`);

const source$ = new Subject();

source$
  .pipe(mergeMap(val => {
    return forkJoin([
      resolvePromise(val), 
      rejectPromise(val)
      ]);
  }))
  .subscribe(
     console.log, 
     console.error
  );

  source$.next('1');
  source$.next('2');

Update

Based on suggestion given by Andrei Gătej below, in order to overcome this problem, I chose to pipe a catchError after the forkJoin, and place the error handling inside.

source$.pipe(
  mergeMap(val => {
    return forkJoin([
      resolvePromise(val),
      rejectPromise(val)
    ])
      .pipe(
        catchError(err => {
          // your error handling business logic
          console.error(err);
          return empty();
        })
      );
  }))
  .subscribe(console.log);

Upvotes: 0

Views: 443

Answers (2)

satanTime
satanTime

Reputation: 13539

you need to resubscribe via retry or repeat.

import { of,forkJoin,Subject } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

const resolvePromise = val =>
  new Promise(resolve => resolve(`resolve value: ${val}`));

const rejectPromise = err =>
  Promise.reject(`reject error: ${err}`);

const source$ = new Subject();

source$.pipe(
  mergeMap(val => forkJoin([
    resolvePromise(val), 
    rejectPromise(val)
  ])),
  catchError(() => EMPTY),
  repeat(),
).subscribe(
  console.log, 
  console.error
);

source$.next('1');
source$.next('2');

Upvotes: 0

Andrei Gătej
Andrei Gătej

Reputation: 11924

reject promise, which literally "throws an exception"

I'd say you're right, judging by how RxJS handles promises internally:

promise.then(
  (value) => {
    if (!subscriber.closed) {
      subscriber.next(value);
      subscriber.complete();
    }
  },
  (err: any) => subscriber.error(err) // !
)

Source.

What's also worth mentioning is that, as you can see from the snippet above, as soon as the promise resolves for the first time, that resolved value and a complete notification will be passed along.

The complete part is very important for the forkJoin operator.

forkJoin subscribes to all the provided observables and waits until all complete. It will send an array with values only if the observables would have emitted at least once.

When one observable emits an error notification, forkJoin will immediately send that error notification further in the chain.


Here's my approach:

ource$
  .pipe(mergeMap(val => {
    return forkJoin([
      resolvePromise(val), 
      from(rejectPromise(val)).pipe(catchError(err => of(err)))
      ]);
  }))
  .subscribe(
     console.log, 
     console.error
  );

We're using from so that we can intercept that promise error early, which allows us to use catchError and use an observable that will send a value and then a complete notification(of(err)).

Upvotes: 1

Related Questions