Reputation: 1781
The method retryWhen returns a Subject, how can I merge it's next value with the main stream ?
dataStream.
retryWhen(errors => {
return errors.
delay(1000).
take(3).
concat(Rx.Observable.throw(errors));
}).
subscribe(
x => console.log('onNext:', x),
e => {
e.subscribe(function (value) {
console.log('err', value);
});
},
_ => console.log('onCompleted'));
Right now it works but I have to subscribe to the e in the error handler to get the value of the error thrown by dataStream. Is it possible to simplify this ?
updates:
I would like to code a "data-access routine" with resilience. Currently It will retry 3 times with a delay of 1sec and in the case of an error, It will throw the "error". The problem is it doesn't throw an error it throws a "Subject" and I have to subscribe to it to get the error.
let getPromiseStream = function (endpoint) {
return Rx.Observable.
just(endpoint).
flatMap(requestUrl => {
return _this.$http.get(requestUrl);
});
};
let itemsStream = getPromiseStream('/items');
let locksStream = getPromiseStream('/error');
let usersStream = getPromiseStream('/users');
let favsStream = getPromiseStream('/favs');
let dataStream = Rx.Observable.
zip(
itemsStream,
locksStream,
usersStream,
favsStream,
(items, locks, users, favs) => {
return {items: items.data, locks: locks.data, users: users.data, favs: favs.data};
});
Upvotes: 0
Views: 1788
Reputation: 18663
retryWhen
can be a little difficult to use (I had to look it up again to remember its behavior). Essentially retryWhen
will take all errors that you receive from the source Observable
and convert them into onNext
calls. The function you pass in takes in that Observable and then lets you play with the events passing through. If you want to continue retrying it should simply pass the event through. If it should error then you need to convert the onNext
into a onError
or if you just want it to stop trying then you should convert it into an onCompleted
.
So onto your code you want to delay a certain number of times and then error the whole thing out if it really is more than just a transient network error.
dataStream.
retryWhen(errors => {
return errors
.flatMap((err, count) => {
return count < 3 ?
//This will just get flattened out and passed through
Rx.Observable.just(err) :
//If you have received 3 errors it is time to throw
//This error will get passed all the way to the final
//onError method if it isn't caught along the way
Rx.Observable.throw(err);
})
.delay(1000);
})
.subscribe(
x => console.log('onNext:', x),
e => console.log('err', value),
_ => console.log('onCompleted'));
Additional note: I would suggest that you put the retryWhen
on each Observable
that feeds into your zip
rather than on the whole thing. As in the latter case a failure from one will retry all the source Observables
not just the one that failed.
Upvotes: 1