Reputation: 31805
In this example, the source
Observable
generates numbers between 0 and 6, and considers 6 as an error. How can I create a target
Observable
that brings the errors back to the normal events flow.
I tried this but it stops working when an error occurs.
const { of, interval, throwError } = rxjs;
const { switchMap, map, catchError } = rxjs.operators;
const source = interval(1000).pipe(
map(() => Math.floor(Math.random() * 7)),
switchMap(result => result < 6 ? of(result) : throwError(result)));
const target = source.pipe(catchError(err => of(err)));
target.subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js"></script>
Upvotes: 0
Views: 56
Reputation: 11924
Just wanted to add that catchError
is given a second argument which represents the caught observable. Basically, if you want to re-subscribe to the source when an error occurs, you can also do this:
src$.pipe(
catchError((err, caught$) => caught$)
).subscribe(/* ... */)
Upvotes: 1
Reputation: 31805
As the contract says here: http://reactivex.io/documentation/contract.html
When an Observable does issue an OnCompleted or OnError notification, the Observable may release its resources and terminate, and its observers should not attempt to communicate with it any further.
That means we have to return the source
Observable
again after an error occurs, like this:
const { of, interval, concat, throwError } = rxjs;
const { switchMap, map, catchError } = rxjs.operators;
const source = interval(1000).pipe(
map(() => Math.floor(Math.random() * 7)),
switchMap(result => result < 6 ? of(result) : throwError(result)));
const target = source => source.pipe(catchError(err => concat(of('Error: ' + err), target(source))));
target(source).subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js"></script>
Upvotes: 1