Alex0007
Alex0007

Reputation: 544

How to make observable to continue emiting values even if error was thrown

Error is thrown and replaced. But then execution is ended. How to make observable to emit 10 elements?

const Rx = require('rxjs/Rx')

Rx.Observable.interval(1000)
  .map((i) => {
    if (i === 2) throw(new Error('omg'))
    return i
  })
  .take(10)
  .catch((err) => {
    return Rx.Observable.of('ok, we caught an error, but we don\'t want to exit')
  })
  .do(console.log, console.error)
  .subscribe()

Upvotes: 1

Views: 1740

Answers (3)

subhaze
subhaze

Reputation: 8855

You could supply a function that handles the errors and returns an observable. You'll need to use flatMap instead since you'll be using a higher order function.

function handleError(cb){
    return (val) => {
        try{
            return Rx.Observable.of(cb(val));
        }catch(err){
            console.error(`${err}`);
            return Rx.Observable.empty();
        }
    }
}

Rx.Observable.interval(1000)
    .flatMap(handleError(i => {
        if (i === 2) throw(new Error('omg'))
        return i;
    }))
    .take(10)
    .do(console.log)
    .subscribe()

// emits
// 0
// 1
// "Error: omg"
// 3
// 4
// 5
// 6
// 7
// 8
// 9
// 10

jsbin example

Another example that doesn't quite fit your example code but is worth mentioning, the lead developer of RxJS, Ben Lesh, touched on this issue in a post called On The Subject Of Subjects (in RxJS). There's a section halfway through called "Gotchas in RxJS."

[...] Since Rx observables do not “trap” errors, we can run into some strange behavior here. Error “trapping” is a behavior I myself have derided Promises for implementing, but in multicast scenarios it may be the right move. What I mean when I say Rx observable does not “trap” errors is basically that when an error percolates to the end of the observer chain, if the error is unhandled, it will be re-thrown.

Here is one code example from that section (the simplest but not most performant):

const source$ = Observable.interval(1000)
  .share()
  .observeOn(Rx.Scheduler.asap); // magic here
const mapped$ = source$.map(x => {
  if (x === 1) {
    throw new Error('oops');
  }
  return x;
});
source$.subscribe(x => console.log('A', x));
mapped$.subscribe(x => console.log('B', x));
source$.subscribe(x => console.log('C', x));
// "A" 0
// "B" 0
// "C" 0
// "A" 1
// Uncaught Error: "oops"
// "C" 1
// "A" 2
// "C" 2 
// "A" 3
// "C" 3
// ... etc

jsbin example

Upvotes: 1

Asti
Asti

Reputation: 12667

The Observable contract is OnNext*(OnError|OnCompleted)+.

Once the sequence ends, downstream operators must unsubscribe. You can only resubscribe to the pipeline. If your observable is cold, then you can use the retry operator to resubscribe.

observable
.retry()
.take(10)
.subscribe()

Upvotes: 0

Bradford
Bradford

Reputation: 4193

I'm no RxJS guru, but I'd like to try to answer this.

Throwing an error with RxJS terminates the observable. As a result, you cannot resume it, but can only attempt to retry/repeat the observable.

If you do not want to replay the error and must only take the original 10 elements, then you could return null instead of throw new Error and just filter(x => x) before you take(10).

Otherwise, you can use retryWhen to repeat the observable on error. Note that this would take the 2 items, fail, and then it would start over repeating on 0, 1, ... It caps out after 2 fails, but still only takes 10 items.

Rx.Observable.interval(1000)
  .map((i) => {
    if (i === 2) throw(new Error('omg'))
    return i
  })
  .retryWhen((errors) => errors.scan(
    (errorCount, err) => {
        if(errorCount >= 2) {
            throw err;
        }

        return errorCount + 1;
    }, 0)
  )
  .take(10)
  .catch((err) => {
    return Rx.Observable.of('ok, we caught an error, but we don\'t want to exit')
  })
  .subscribe((x) => console.log('result', x))

You can also use repeat to just keep repeating whenever the observable finishes. This is likely not what you want, but I wanted to show it to you as an option anyway. You will need to pay attention to where you place take and it also "counts" the emitted catch observable.

Rx.Observable.interval(1000)
  .map((i) => {
    if (i === 2) throw(new Error('omg'))
    return i
  })
  .catch((err) => {
    return Rx.Observable.of('ok, we caught an error, but we don\'t want to exit')
  })
  .repeat()
  .take(10)
  .subscribe((x) => console.log('result', x))

Upvotes: 0

Related Questions