Guerric P
Guerric P

Reputation: 31805

Observable that continues emitting events after having emitted an error

As the contract says here: http://reactivex.io/documentation/contract.html

When an Observable does issue an OnCompleted or OnError notification, the Observable may release its resources and terminate, and its observers should not attempt to communicate with it any further.

In this snippet, I create an Observable that emits events even after emitting an error, and my target function allows to create another Observable from it that doesn't stop listening for events after an error occurs.

const { of, interval, concat, throwError } = rxjs;
const { switchMap, map, catchError } = rxjs.operators;

const source = interval(1000).pipe(
        map(() => Math.floor(Math.random() * 7)),
        switchMap(result => result < 6 ? of(result) : throwError(result)));

const target = source => source.pipe(catchError(err => concat(of('Error: ' + err), target(source))));

target(source).subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js"></script>

One use case for this snippet would be to create an autocomplete that doesn't stop working in case of an HTTP error. Is that an anti-pattern? Would it cause something like memory leaks or callstack growing forever?

Upvotes: 0

Views: 258

Answers (2)

satanTime
satanTime

Reputation: 13539

That's totally fine to catch errors you want to handle and switch them to an other stream. You also can use retry operator to ensure that after N attempts it will finally fail.

The only note - don't forget to unsubscribe once you don't need it and it won't produce memory leaks.

The best case to handle it closer to the failure, then you don't need to return back the original stream or to use repeat / retry operators.

const { of, interval, concat, throwError } = rxjs;
const { switchMap, map, catchError } = rxjs.operators;

const source = searchTerm$.pipe(
  debounce(250),
  switchMap(term => this.http.get('/suggestion?=' + term).pipe(
    catchError(() => EMPTY), // won't break the parent stream of terms.
  ),
);

source.subscribe(console.log); // will receive emits until unsubscribe. 

Upvotes: 0

Fan Cheung
Fan Cheung

Reputation: 11345

To my knowledge no, you just passing on the source observable so the subscription won't end.

In addition the source observable is a timed observable so it won't block the main thread. But if you replace interval with cold of(something) observable and the error is repeatable, your main thread will freeze

There is an cleaner pattern

source.pipe(catchError(err=>...of(err)),repeat())

Demo:

const { of, interval, concat, throwError } = rxjs;
const { switchMap, map, repeat, catchError } = rxjs.operators;

const source = interval(1000).pipe(
        map(() => Math.floor(Math.random() * 7)),
        switchMap(result => result < 6 ? of(result) : throwError(result)),
        catchError(err => of('Error: ' + err)),
        repeat());

source.subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js"></script>

Upvotes: 1

Related Questions