Reputation: 8655
I am trying to employ a retry pattern in my service calls (actually: @Effects in ngrx/store) with increased delay intervals. Since I managed to come up with a working code for one call (even if it looks unoptimized I don't want to focus on this in my question), I would now like to extract it into a custom Observable operator, and use it repeatedly in all my service calls.
I am blank as per how to design the API/usage for the new operator and how to make it recognized by TypeScript.
The code below certainly does not work, because it probably accumulates multitude of problems.
So, now I have a call/Effect as follows:
@Effect()
loadData$: Observable<Action> = this.actions$
.ofType(ActionTypes.LOAD_DATA)
.pluck('payload')
.switchMap(params => {
return this.myService.getData(params)
.map(res => new LoadDataCompleteAction(res))
// ...and this part would have to be extracted:
.retryWhen(attempts => Observable
.zip(attempts, Observable.range(1, 5))
.flatMap((n, i) => {
if (i < 4) {
return Observable.timer(1000 * i);
} else {
throw(n);
}
})
)
})
.catch(err => Observable.of(new LoadDataFailed()));
and what I am after, is being able to reuse the retry part in other effects, and have pattern similar to below:
@Effect()
loadData$: Observable<Action> = this.actions$
.ofType(ActionTypes.LOAD_DATA)
.pluck('payload')
.switchMap(params => {
return this.myService.getData(params)
.map(res => new LoadDataCompleteAction(res))
.retryWhen(attempts => Observable.retryOrThrow(attempts, maxAttempts)
// or maybe - that's my design question
.retryOrThrow(attempts, maxAttempts)
})
.catch(err => Observable.of(new LoadDataFailed()));
For simplicity, we could assume that the delay callback pattern (i * 1000
) would be constant for entire app.
The below code is my attempt, but it obviously does not work.
declare module 'rxjs/Observable' {
interface Observable<T> {
retryOrThrow<T>(attempts: any, max: number): Observable<T>;
}
}
Observable.prototype.retryOrThrow = function(attempt, max) {
console.log('retryOrThrow called');
return Observable.create(subscriber => {
const source = this;
const subscription = source.subscribe(() => {
// important: catch errors from user-provided callbacks
try {
subscriber
.zip(attempt, Observable.range(1, max + 1))
.flatMap((n, i) => {
console.log(n, i);
if (i < max) {
return Observable.timer(1000 * i);
} else {
throw(n);
}
});
} catch (err) {
subscriber.error(err);
}
},
// be sure to handle errors and completions as appropriate and send them along
err => subscriber.error(err),
() => subscriber.complete());
// to return now
return subscription;
});
};
Updated service call:
getMocky(){
const u = Math.random();
const okUrl = 'http://www.mocky.io/v2/58ffadf71100009b17f60044';
const erUrl = 'http://www.mocky.io/v2/58ffae7f110000ba17f60046';
return u > 0.6 ? this.http.get(okUrl) : this.http.get(erUrl);
}
Upvotes: 2
Views: 1268
Reputation: 1682
I can't answer your question directly since I don't know how to extend rxjs with a custom operator.
Fortunately in your case, you (and I) don't need to know. What you are really asking is how to pre-define a chain of operators to reuse in multiple places.
All you need is the let-Operator and it is quite simple to use.
First extract the logic you want to reuse into a function that returns an observable:
function retryOrThrow(maxAttempts, timeout) {
return (source) =>
source
.retryWhen(attempts => Observable
.zip(attempts, Observable.range(1, maxAttempts + 1))
.flatMap((n, i) => {
if (i < maxAttempts) {
return Observable.timer(timeout * i);
} else {
throw (n);
}
})
);
}
Use the function in your effect using let
:
@Effect()
loadData$: Observable<Action> = this.actions$
.ofType(ActionTypes.LOAD_DATA)
.pluck('payload')
.switchMap(params => {
return this.myService.getData(params)
.map(res => new LoadDataCompleteAction(res))
// inject the pre-defined logic with the desired parameters
.let(retryOrThrow(5, 1000))
})
.catch(err => Observable.of(new LoadDataFailed()));
The easiest way to understand what let
does is to look at it's source-code. It's really simple, because all it does is apply a given function to the source-observable.
Upvotes: 4