Peter Albert
Peter Albert

Reputation: 17475

Combine RxJS operators into new operator using TypeScript

I frequently find my self adding the same sequence of operators to observables, e.g.

observable$
  .do(x => console.log('some text', x))
  .publishReplay()
  .refCount();

I'm looking for a way to combine these 3 operators in a small reusable operator (e.g. .cache('some text')) that I can chain to any observable. How can I define this in Typescript, so that I could import rxjs/Observable and this operator, like I do with rxjs operators?

Upvotes: 11

Views: 3221

Answers (2)

nPn
nPn

Reputation: 16728

cartant's answer above works well, and answers the question that was asked (How can I define this in Typescript, so that I could import rxjs/Observable and this operator, like I do with rxjs operators?)

I recently discovered the let operator which if you don't actually need to have the function implemented as an operator, will still let you DRY up your code.

I was starting on implementing an angular 2 service to interface with my rails backend and knew that most of my api calls would look very similar so I wanted to try and put as much of the common stuff in a function.

Almost all the calls will do the following:

  1. retry on an error (my function below needs more work on that front)
  2. map the http response into a typescript locally defined class (via json-typescript-mapper)
  3. handle errors

Here is an example of my use the let operator to my http responses through a common function (handleResponse) via the rxjs let operator.

  handleResponse<T>({klass, retries=0} :{klass:any,retries?:number }) : (source: Observable<Response>) => Observable<T> {
    return (source: Observable<Response>)  : Observable<T> => {
      return source.retry(retries)
            .map( (res) => this.processResponse(klass,res))
            .catch( (res) => this.handleError(res));
    } 
  } 

  processResponse(klass, response: Response) {
    return deserialize(klass, response.json());
  }

  handleError(res: Response) {
    const error = new RailsBackendError(res.status, res.statusText);
    return  Observable.throw(error);
  }

  getUserList({page=1,perPage=30,retry=0}: { page?:number, perPage?:number, retry?:number }={}) : Observable<UserList> {
    const requestURL = `/api/v1/users/?${this.apiTokenQueryString}&page=${page}&per_page=${perPage}`;
    return this.http.get(requestURL).let(this.handleResponse<UserList>({klass: UserList}));
  }

Upvotes: 3

cartant
cartant

Reputation: 58400

To implement the operator you have described, create a cache.ts file with the following content:

import { Observable } from "rxjs/Observable";
import "rxjs/add/operator/do";
import "rxjs/add/operator/publishReplay";

// Compose the operator:

function cache<T>(this: Observable<T>, text: string): Observable<T> {
  return this
    .do(x => console.log(text, x))
    .publishReplay()
    .refCount();
}

// Add the operator to the Observable prototype:

Observable.prototype.cache = cache;

// Extend the TypeScript interface for Observable to include the operator:

declare module "rxjs/Observable" {
  interface Observable<T> {
    cache: typeof cache;
  }
}

And consume it like this:

import { Observable } from "rxjs/Observable";
import "rxjs/add/observable/of";
import "./cache";

let cached = Observable.of(1).cache("some text");
cached.subscribe(x => console.log(x));

Upvotes: 25

Related Questions