Brett
Brett

Reputation: 1781

How to merge the value of a Subject "next" into the main stream with Rxjs

The method retryWhen returns a Subject, how can I merge it's next value with the main stream ?

dataStream.
    retryWhen(errors => {
        return errors.
            delay(1000).
            take(3).
            concat(Rx.Observable.throw(errors));
    }).
    subscribe(
        x => console.log('onNext:', x),
        e => {
            e.subscribe(function (value) {
                console.log('err', value);
            });
        },
        _ => console.log('onCompleted'));

Right now it works but I have to subscribe to the e in the error handler to get the value of the error thrown by dataStream. Is it possible to simplify this ?

updates:

I would like to code a "data-access routine" with resilience. Currently It will retry 3 times with a delay of 1sec and in the case of an error, It will throw the "error". The problem is it doesn't throw an error it throws a "Subject" and I have to subscribe to it to get the error.

let getPromiseStream = function (endpoint) {
    return Rx.Observable.
        just(endpoint).
        flatMap(requestUrl => {
            return _this.$http.get(requestUrl);
        });
};

let itemsStream = getPromiseStream('/items');
let locksStream = getPromiseStream('/error');
let usersStream = getPromiseStream('/users');
let favsStream = getPromiseStream('/favs');

let dataStream = Rx.Observable.
            zip(
            itemsStream,
            locksStream,
            usersStream,
            favsStream,
            (items, locks, users, favs) => {
                return {items: items.data, locks: locks.data, users: users.data, favs: favs.data};
            });

Upvotes: 0

Views: 1788

Answers (1)

paulpdaniels
paulpdaniels

Reputation: 18663

retryWhen can be a little difficult to use (I had to look it up again to remember its behavior). Essentially retryWhen will take all errors that you receive from the source Observable and convert them into onNext calls. The function you pass in takes in that Observable and then lets you play with the events passing through. If you want to continue retrying it should simply pass the event through. If it should error then you need to convert the onNext into a onError or if you just want it to stop trying then you should convert it into an onCompleted.

So onto your code you want to delay a certain number of times and then error the whole thing out if it really is more than just a transient network error.

dataStream.
    retryWhen(errors => {
        return errors
            .flatMap((err, count) => {
              return count < 3 ? 
                //This will just get flattened out and passed through
                Rx.Observable.just(err) :
                //If you have received 3 errors it is time to throw
                //This error will get passed all the way to the final
                //onError method if it isn't caught along the way
                Rx.Observable.throw(err);
            })
            .delay(1000);
    })
    .subscribe(
        x => console.log('onNext:', x),
        e => console.log('err', value),
        _ => console.log('onCompleted'));

Additional note: I would suggest that you put the retryWhen on each Observable that feeds into your zip rather than on the whole thing. As in the latter case a failure from one will retry all the source Observables not just the one that failed.

Upvotes: 1

Related Questions