user776686
user776686

Reputation: 8655

How do I properly extend rxjs/Observable with my first custom operator

I am trying to employ a retry pattern in my service calls (actually: @Effects in ngrx/store) with increased delay intervals. Since I managed to come up with a working code for one call (even if it looks unoptimized I don't want to focus on this in my question), I would now like to extract it into a custom Observable operator, and use it repeatedly in all my service calls.

I am blank as per how to design the API/usage for the new operator and how to make it recognized by TypeScript.

The code below certainly does not work, because it probably accumulates multitude of problems.

So, now I have a call/Effect as follows:

  @Effect()
  loadData$: Observable<Action> = this.actions$
    .ofType(ActionTypes.LOAD_DATA)
    .pluck('payload')
    .switchMap(params => {
      return this.myService.getData(params)
        .map(res => new LoadDataCompleteAction(res))

        // ...and this part would have to be extracted: 
        .retryWhen(attempts => Observable
          .zip(attempts, Observable.range(1, 5))
          .flatMap((n, i) => {
            if (i < 4) {
              return Observable.timer(1000 * i);
            } else {
              throw(n);
            }
          })
        )
    })
    .catch(err => Observable.of(new LoadDataFailed()));

and what I am after, is being able to reuse the retry part in other effects, and have pattern similar to below:

  @Effect()
  loadData$: Observable<Action> = this.actions$
    .ofType(ActionTypes.LOAD_DATA)
    .pluck('payload')
    .switchMap(params => {
      return this.myService.getData(params)
        .map(res => new LoadDataCompleteAction(res))
        .retryWhen(attempts => Observable.retryOrThrow(attempts, maxAttempts)

         // or maybe - that's my design question
         .retryOrThrow(attempts, maxAttempts)
    })
    .catch(err => Observable.of(new LoadDataFailed()));

For simplicity, we could assume that the delay callback pattern (i * 1000) would be constant for entire app.

The below code is my attempt, but it obviously does not work.

declare module 'rxjs/Observable' {
  interface Observable<T> {
    retryOrThrow<T>(attempts: any, max: number): Observable<T>;
  }     
}

Observable.prototype.retryOrThrow = function(attempt, max) {
  console.log('retryOrThrow called');

  return Observable.create(subscriber => {
    const source = this;
    const subscription = source.subscribe(() => {
        // important: catch errors from user-provided callbacks
        try {
          subscriber
            .zip(attempt, Observable.range(1, max + 1))
            .flatMap((n, i) => {
              console.log(n, i);
              if (i < max) {
                return Observable.timer(1000 * i);
              } else {
                throw(n);
              }
            });
        } catch (err) {
          subscriber.error(err);
        }
      },
      // be sure to handle errors and completions as appropriate and send them along
      err => subscriber.error(err),
      () => subscriber.complete());

    // to return now
    return subscription;
  });
};
  1. I am not sure how to design the API for the new operator, what syntax would suit best here.
  2. I don't know how to properly declare the new operator and the Observable namespace or module, for TypeScript to recognize new stuff.

Updated service call:

  getMocky(){
    const u = Math.random();

    const okUrl = 'http://www.mocky.io/v2/58ffadf71100009b17f60044';
    const erUrl = 'http://www.mocky.io/v2/58ffae7f110000ba17f60046';
    return u > 0.6 ? this.http.get(okUrl) : this.http.get(erUrl);
  }

Upvotes: 2

Views: 1268

Answers (1)

mtx
mtx

Reputation: 1682

I can't answer your question directly since I don't know how to extend rxjs with a custom operator.

Fortunately in your case, you (and I) don't need to know. What you are really asking is how to pre-define a chain of operators to reuse in multiple places.

All you need is the let-Operator and it is quite simple to use.

First extract the logic you want to reuse into a function that returns an observable:

function retryOrThrow(maxAttempts, timeout) {
  return (source) =>
    source
      .retryWhen(attempts => Observable
        .zip(attempts, Observable.range(1, maxAttempts + 1))
        .flatMap((n, i) => {
          if (i < maxAttempts) {
            return Observable.timer(timeout * i);
          } else {
            throw (n);
          }
        })
      );
}

Use the function in your effect using let:

@Effect()
loadData$: Observable<Action> = this.actions$
  .ofType(ActionTypes.LOAD_DATA)
  .pluck('payload')
  .switchMap(params => {
    return this.myService.getData(params)
      .map(res => new LoadDataCompleteAction(res))
      // inject the pre-defined logic with the desired parameters
      .let(retryOrThrow(5, 1000))
    })
    .catch(err => Observable.of(new LoadDataFailed()));

The easiest way to understand what let does is to look at it's source-code. It's really simple, because all it does is apply a given function to the source-observable.

Upvotes: 4

Related Questions