David
David

Reputation: 16038

How do I throw an error on a behaviour subject and continue the stream?

On one end, I have a stream which may occasionally throw an error:

this.behaviorSubject.error(error)

Later on, however, I want to continue the stream:

this.behaviorSubject.next(anotherValue)

on the other end, I have a subscriber subscribed to behaviorSubject.asObservable().

In the subscribtion, I'm handling the value and the error:

.subscribe( 
   ( value ) =>  {  /* ok */ },
   ( error ) =>  {  /* some error */ }
);

I want the effect to be the same as a simple onSuccess and onError callback, where onError is called every time an error occurs and doesn't prevent future onSuccess calls from being made. How do I do this with RXJS?

I've looked into catch but it seems to just prevent error from being called on subscribers.

Upvotes: 62

Views: 43757

Answers (4)

Chaos Crafter
Chaos Crafter

Reputation: 689

As others have said, the error result is expected to be terminal. Personally I think (in your case) there are 3 types of results (technically 4) The ideal result is the success case that calls next().
A lethal fail (out of memory error or any other form of "call can't continue" error) should call error(). The third form and the one that is key to your problem is the non-terminal error. It is the "Result was not a success" form. Because you mean to continue, it is not an Error in the rxjs sense. It is merely another type of result. A result that says something else happened. (The 4th form is "processing completed": done all I can and am exiting without error)

Now I'm not sure of the details, but as I recall typescript can handle union types (if not you might have to play with a result type of "any"). With Unions you can declare your object as (for example) Subject<string|failtype> The point here is you can send different results from the next statement. You'd do something like the following...

DoCoolThingFunction():Subject<string|failtype>
{
    const response = new Subject<string|failtype>();
    deeperPeriodicAsyncOperation.subscribe((result) => {

      if (result is something I like) {
        response.next(result.toString());

      } else if (result is final value) {
        response.next(result.toString());
        response.complete();

      } else if (result is something teminal) {
        response.error(result);

      } else if (result is non-terminal error) {
        response.next(new failtype(result));
      }
    });
    return response;
}

Basically, this is saying "An error in this range is non-terminal. As such it is not an error, it is just a different kind of operational data".

Of course it is up to your receiving code to determine which type of result it has been handed. I've no idea if there are any neat ways to do that. It'd be great if the result handler could have multiple different typed responses ((result:string) =>{}, (result:failtype)=>{}) etc. but that's not really a topic for this thread.

Upvotes: 0

Olaf Horstmann
Olaf Horstmann

Reputation: 16892

Short answer: It's not possible.

How to work with this: The basic concept of RxJS is that any error or complete-call will basically "kill" a stream. This concept forces you not "just to throw around errors here and there as you please" but to handle errors and the flow of data within your application properly. A BehaviorSubject for example is typically meant to hold data, however it should not be used to also include the process of retrieving/creating that data and handle possible errors that might occur during the retrieval of the data.

So if you want to go by the book, you should split up your flow into two parts:

  1. Retrieval/creation of the data: A stream, that will run once then then completes and/or throws an error whenever one occurs. When the data is retrieved it will be sent to the store.
  2. The store (e.g. as in your case: a bunch of BehaviorSubjects): Only valid data arrives in the store, this means that no error-handling is done here and all parts relying on the store can trust in the store that it holds the correct data.

As an example your data flow could look as follows (as a rough sketch):

store.ts

dataStore: BehaviorSubject<IData> = new BehaviorSubject<IData>();
errorMessage: BehaviorSubject<IErrorMsg> = new BehaviorSubject<IErrorMsg>();

data-retrieval.ts

fetchDataById(id: string) {
    httpService.get(`some/rest/endpoint/${id}`)
        .subscribe(handleData, handleError);
}

handleData(data: IData) {
    errorMessage.next(null);
    dataStore.next(data);
}

handleError(error: Error) {
    errorMessage.next(error.message);
    dataStore.next(null);
}

"But this looks like a lot of overhead..." - True, however it ensures a clean and easy-to-understand flow of data within your application, that is easy to test and maintain. Also there are ready-to-use store-concepts like ngrx or redux that could be used.

Upvotes: 53

KickerKeeper
KickerKeeper

Reputation: 265

This may not work for your situation, but I ran into this same issue when work with Angular 2 because we would navigate across screens and would want the service to retry an API and not just call the error function again. It would actually cause bigger issues because the function was called in our constructor and the error function would try to update the UI which was not yet ready.

What I did seems to work fine and be pretty clean. I created a reset the Subject in the error handler.

subject.subscribe( 
   ( value ) =>  {  /* ok */ },
   ( error ) =>  {  
      //handle error
      //reset subject
      this.subject = new Subject<any>();
   }
);

This works in our case because every time you navigate to the screen new subscriptions are getting torn down from the old screen then set up in the new, so the new subject won't hurt anything.

Upvotes: 1

Mark van Straten
Mark van Straten

Reputation: 9425

Rx is fundamentally built upon the concept that an observable is either active or finalized (onComplete or onError). When an Observable is finalizing it will unSubscribe from its upstream Observable. No .catch can fix that behaviour, it only gives you the option to map the error to something else.

Rx.Observable.interval(500)
  .mergeMap(i => i % 3 == 2 ? Rx.Observable.throw(new Error('kboom')) : Rx.Observable.of(i))
  .catch(err => Rx.Observable.of(err.message))
  .subscribe(
    val => console.log('val: ' + val),
    err => console.log('err: ' + err),
    () => console.log('stream completed')
  )

Note that this example completes after 3 emissions instead of 5

When you invoke this.behaviorSubject.error(error) it wil internally finalize the Observable contained in your Subject. If you want to somehow emit errors then you need to make your errors non-error values:

this.behaviorSubject.next({ value: 'somevalue' });
this.behaviorSubject.next({ error: error });
this.behaviorSubject.next({ value: 'somevalue' });

Then you are able to distinguish based on the properties on your emitted value what action you should take.

Upvotes: 8

Related Questions