Reputation: 1523
I'm trying to create a caching function in angular using RxJS Observable. Originally I've created this method using angularjs $q
's deferred promise. Observables and RxJS are new to me and I find this method of working still somewhat confusing.
This is my current implementation of a getOrCreate
caching function. Retrieve a single value for a key from storage (this.get()
) and if it's not in there you retrieve it elsewhere (fetcher
).
Assume fetcher
is a slower data source than this.get()
. Multiple requests for the same key could fire while we're still retrieving from this.get()
so I put in an aggregator: only a single observable is created for multiple requests of the same key.
protected observableCache : {[key: string] : Observable<any>} = {};
get<T>(key : string): Observable<T> { /* Async data retrieval */ }
getOrCreate<T>(key : string, fetcher: () => Observable<T>) : Observable<T> {
const keyHash = this.hash(key);
// Check if an observable for the same key is already in flight
if (this.observableCache[keyHash]) {
return this.observableCache[keyHash];
} else {
let observable : Observable<T>;
this.get(key).subscribe(
// Cache hit
(result) => { observable = Observable.of(result); },
// Cache miss. Retrieving from fetching while creating entry
() => {
fetcher().subscribe((fetchedResult) => {
if(fetchedResult) {
this.put(key, fetchedResult);
}
observable = Observable.of(fetchedResult);
});
}
);
// Register and unregister in-flight observables
this.observableCache[keyHash] = observable;
observable.subscribe(()=> {
delete this.observableCache[this.hash(key)];
});
return observable;
}
}
This is my current version of that code but it doesn't look like I'm properly handling async code:
return observable
fires before observable = Observable.of(result)
;this.get()
is still in-flight.Can someone help with finding the Observer patterns should be used?
Upvotes: 3
Views: 1098
Reputation: 3409
I think this might work. Rewritten as:
getOrCreate<T>(key : string, fetcher: () => Observable<T>) : Observable<T> {
const keyHash = this.hash(key);
// Check if an observable for the same key is already in flight
if (this.observableCache[keyHash]) {
return this.observableCache[keyHash];
}
let observable : ConnectableObservable<T> = this.get(key)
.catch(() => { // Catch is for when the source observable throws error: It replaces it with the new Rx.Observable that is returned by it's method
// Cache miss. Retrieving from fetching while creating entry
return this.fetchFromFetcher(key, fetcher);
})
.publish();
// Register and unregister in-flight observables
this.observableCache[keyHash] = observable;
observable.subscribe(()=> {
delete this.observableCache[keyHash];
});
observable.connect();
return observable;
},
fetchFromFetcher(key : string, fetcher: () => Observable<T>) : Observable<T> {
// Here we create a stream that subscribes to fetcher to use `this.put(...)`, returning the original value when done
return Rx.Observable.create(observer => {
fetcher().subscribe(fetchedResult => {
this.put(key, fetchedResult);
observer.next(fetchedResult);
},
err => observer.error(err),
() => observer.complete())
});
}
Explanations:
this.get(...)
seems asynchronous, your let observable
won't get filled until it yields a value, so when you assign it to your cache it's normal that's null.observable.connect()
. This avoids so many .subscriptionsthis.get(key)
stream, and tell it that if it fails (.catch(...)
) it must fetch the result, but once that's fetched then put it into your cache (this.put(key, fetchedResult
)publish()
this observable: This makes it so it behaves more like promises do, it makes it "hot". This means that all subscribers will get the values from the same stream, instead of creating a new stream that starts from 0 everytime one subscribes to it..connect()
. This is only done if you publish()
it, it's the thing that actually subscribes to the original stream, executing everything you want.To make it clear, because this is a common error coming from Promises, in angular if you define a stream as:
let myRequest = this.http.get("http://www.example.com/")
.map((result) => result.json());
The request it's not sent yet. And everytime you do myRequest.subscribe()
, a new request to the server is made, it won't reuse the "first subscription" result. That's why .publish()
is very useful: It makes that when you call .connect()
it creates a subscription that triggers the request, and will share the last result received (Observables support streams: Many results) with all incoming subscriptions to the published observable.
Upvotes: 3