Raven
Raven

Reputation: 1523

Observable patterns for caching and defered data retrieval

I'm trying to create a caching function in angular using RxJS Observable. Originally I've created this method using angularjs $q's deferred promise. Observables and RxJS are new to me and I find this method of working still somewhat confusing.

This is my current implementation of a getOrCreate caching function. Retrieve a single value for a key from storage (this.get()) and if it's not in there you retrieve it elsewhere (fetcher).

Assume fetcher is a slower data source than this.get(). Multiple requests for the same key could fire while we're still retrieving from this.get() so I put in an aggregator: only a single observable is created for multiple requests of the same key.

protected observableCache : {[key: string] : Observable<any>} = {};

get<T>(key : string): Observable<T> { /* Async data retrieval */ }

getOrCreate<T>(key : string, fetcher: () => Observable<T>) : Observable<T> {
  const keyHash = this.hash(key);

  // Check if an observable for the same key is already in flight
  if (this.observableCache[keyHash]) {
    return this.observableCache[keyHash];
  } else {

    let observable : Observable<T>;

    this.get(key).subscribe(

      // Cache hit
      (result) => { observable = Observable.of(result); },

      // Cache miss. Retrieving from fetching while creating entry
      () => {
        fetcher().subscribe((fetchedResult) => {
          if(fetchedResult) {
            this.put(key, fetchedResult);
          }
          observable = Observable.of(fetchedResult);
        });
      }
    );


   // Register and unregister in-flight observables
   this.observableCache[keyHash] = observable;
   observable.subscribe(()=> {
      delete this.observableCache[this.hash(key)];
   });

    return observable;
  }
}

This is my current version of that code but it doesn't look like I'm properly handling async code:

Can someone help with finding the Observer patterns should be used?

Upvotes: 3

Views: 1098

Answers (1)

olivarra1
olivarra1

Reputation: 3409

I think this might work. Rewritten as:

getOrCreate<T>(key : string, fetcher: () => Observable<T>) : Observable<T> {
    const keyHash = this.hash(key);

    // Check if an observable for the same key is already in flight
    if (this.observableCache[keyHash]) {
        return this.observableCache[keyHash];
    }

    let observable : ConnectableObservable<T> = this.get(key)
        .catch(() => { // Catch is for when the source observable throws  error: It replaces it with the new Rx.Observable that is returned by it's method
            // Cache miss. Retrieving from fetching while creating entry
            return this.fetchFromFetcher(key, fetcher);
        })
        .publish();

    // Register and unregister in-flight observables
    this.observableCache[keyHash] = observable;
    observable.subscribe(()=> {
        delete this.observableCache[keyHash];
    });
    observable.connect();

    return observable;
},

fetchFromFetcher(key : string, fetcher: () => Observable<T>) : Observable<T> {
    // Here we create a stream that subscribes to fetcher to use `this.put(...)`, returning the original value when done
    return Rx.Observable.create(observer => {
        fetcher().subscribe(fetchedResult => {
            this.put(key, fetchedResult);
            observer.next(fetchedResult);
        },
        err => observer.error(err),
        () => observer.complete())
    });
}

Explanations:

  1. Observables are very different from promises. They are to work with async stuff, and there are some similarities, but they are quite different
  2. As this.get(...) seems asynchronous, your let observable won't get filled until it yields a value, so when you assign it to your cache it's normal that's null.
  3. A great thing about observables (and the main difference with promises) is that you can define a stream before anything gets executed. In my solution, nothing gets called until I call observable.connect(). This avoids so many .subscriptions
  4. So, in my code I get the this.get(key) stream, and tell it that if it fails (.catch(...)) it must fetch the result, but once that's fetched then put it into your cache (this.put(key, fetchedResult)
  5. Then I publish() this observable: This makes it so it behaves more like promises do, it makes it "hot". This means that all subscribers will get the values from the same stream, instead of creating a new stream that starts from 0 everytime one subscribes to it.
  6. Then I store it in the observable pool, and set to delete it when it finishes.
  7. Finally, I .connect(). This is only done if you publish() it, it's the thing that actually subscribes to the original stream, executing everything you want.

To make it clear, because this is a common error coming from Promises, in angular if you define a stream as:

let myRequest = this.http.get("http://www.example.com/")
    .map((result) => result.json());

The request it's not sent yet. And everytime you do myRequest.subscribe(), a new request to the server is made, it won't reuse the "first subscription" result. That's why .publish() is very useful: It makes that when you call .connect() it creates a subscription that triggers the request, and will share the last result received (Observables support streams: Many results) with all incoming subscriptions to the published observable.

Upvotes: 3

Related Questions