Aditya Jain
Aditya Jain

Reputation: 1095

Handle Error in RxJs flatMap stream and continue processing

I am using RxJs in an Angular 2 application to fetch data from an API for multiple pages in parallel and save any failed requests for future re-try.

To do so, I want to catch errors generated by flatMap-ing http get requests (code below) and continue with further stream processing. In case of error, my current solution causes stream to discontinue.

Rx.Observable.range(1, 5)
   .flatMap(pageNo => {
              params.set('page', ''+pageNo);
              return this.http.get(this.API_GET, params)
                        .catch( (err) => {
                                  //save request
                                  return Rx.Observable.throw(new Error('http failed')); 
                          });
    })
    .map((res) => res.json());  

Let's say in above example, HTTP request for page 2 and 3 fails. I want to handle error (save failed request later retry) for both these request and let other requests continue and get mapped to json().

I tried using .onErrorResumeNext instead of catch, but I am unable to make this work.

Upvotes: 7

Views: 9589

Answers (1)

Olaf Horstmann
Olaf Horstmann

Reputation: 16882

Inside your catch, don't return an Observable.throw, then it should continue the stream as desired.

If you want to propagate the information to the outer stream, you could use an return Observable.of("Error: Foo.Bar"); for example.

Or log the error inside the catch and return an Observable.empty() to have the outer stream basically ignore the error.

In other words, just chain this:

.catch(error => Rx.Observable.of(error));

const stream$ = Rx.Observable.range(1, 5)
    .flatMap(num => {
      return simulateRest(num)
             .catch(error => {
                 console.error(error);
                 return Rx.Observable.empty();
             });
      });
             
stream$.subscribe(console.log);

// mocking-fn for simulating an error
function simulateRest(num) {
    if (num === 2) {
        return Rx.Observable.throw("Error for request: " + num);
    }
  
    return Rx.Observable.of("Result: " + num);
}
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

Upvotes: 8

Related Questions