Reputation: 544
Error is thrown and replaced. But then execution is ended. How to make observable to emit 10 elements?
const Rx = require('rxjs/Rx')
Rx.Observable.interval(1000)
.map((i) => {
if (i === 2) throw(new Error('omg'))
return i
})
.take(10)
.catch((err) => {
return Rx.Observable.of('ok, we caught an error, but we don\'t want to exit')
})
.do(console.log, console.error)
.subscribe()
Upvotes: 1
Views: 1740
Reputation: 8855
You could supply a function that handles the errors and returns an observable. You'll need to use flatMap
instead since you'll be using a higher order function.
function handleError(cb){
return (val) => {
try{
return Rx.Observable.of(cb(val));
}catch(err){
console.error(`${err}`);
return Rx.Observable.empty();
}
}
}
Rx.Observable.interval(1000)
.flatMap(handleError(i => {
if (i === 2) throw(new Error('omg'))
return i;
}))
.take(10)
.do(console.log)
.subscribe()
// emits
// 0
// 1
// "Error: omg"
// 3
// 4
// 5
// 6
// 7
// 8
// 9
// 10
Another example that doesn't quite fit your example code but is worth mentioning, the lead developer of RxJS, Ben Lesh, touched on this issue in a post called On The Subject Of Subjects (in RxJS). There's a section halfway through called "Gotchas in RxJS."
[...] Since Rx observables do not “trap” errors, we can run into some strange behavior here. Error “trapping” is a behavior I myself have derided Promises for implementing, but in multicast scenarios it may be the right move. What I mean when I say Rx observable does not “trap” errors is basically that when an error percolates to the end of the observer chain, if the error is unhandled, it will be re-thrown.
Here is one code example from that section (the simplest but not most performant):
const source$ = Observable.interval(1000)
.share()
.observeOn(Rx.Scheduler.asap); // magic here
const mapped$ = source$.map(x => {
if (x === 1) {
throw new Error('oops');
}
return x;
});
source$.subscribe(x => console.log('A', x));
mapped$.subscribe(x => console.log('B', x));
source$.subscribe(x => console.log('C', x));
// "A" 0
// "B" 0
// "C" 0
// "A" 1
// Uncaught Error: "oops"
// "C" 1
// "A" 2
// "C" 2
// "A" 3
// "C" 3
// ... etc
Upvotes: 1
Reputation: 12667
The Observable contract is OnNext*(OnError|OnCompleted)+
.
Once the sequence ends, downstream operators must unsubscribe. You can only resubscribe to the pipeline. If your observable is cold, then you can use the retry
operator to resubscribe.
observable
.retry()
.take(10)
.subscribe()
Upvotes: 0
Reputation: 4193
I'm no RxJS guru, but I'd like to try to answer this.
Throwing an error with RxJS terminates the observable. As a result, you cannot resume it, but can only attempt to retry/repeat the observable.
If you do not want to replay the error and must only take the original 10 elements, then you could return null
instead of throw new Error
and just filter(x => x)
before you take(10)
.
Otherwise, you can use retryWhen
to repeat the observable on error. Note that this would take the 2 items, fail, and then it would start over repeating on 0, 1, ... It caps out after 2 fails, but still only takes 10 items.
Rx.Observable.interval(1000)
.map((i) => {
if (i === 2) throw(new Error('omg'))
return i
})
.retryWhen((errors) => errors.scan(
(errorCount, err) => {
if(errorCount >= 2) {
throw err;
}
return errorCount + 1;
}, 0)
)
.take(10)
.catch((err) => {
return Rx.Observable.of('ok, we caught an error, but we don\'t want to exit')
})
.subscribe((x) => console.log('result', x))
You can also use repeat
to just keep repeating whenever the observable finishes. This is likely not what you want, but I wanted to show it to you as an option anyway. You will need to pay attention to where you place take and it also "counts" the emitted catch observable.
Rx.Observable.interval(1000)
.map((i) => {
if (i === 2) throw(new Error('omg'))
return i
})
.catch((err) => {
return Rx.Observable.of('ok, we caught an error, but we don\'t want to exit')
})
.repeat()
.take(10)
.subscribe((x) => console.log('result', x))
Upvotes: 0