Reputation: 265
Help please solve the problem
I have a method in service
public list(): Observable<ILanguage[]> {
return Observable.create((observer: Observer<ILanguage[]>) => {
if (this._languages && this._languages.length > 0) {
observer.next(this._languages);
observer.complete();
} else {
this._http.get<ILanguage[]>(this._constants.apiUrl + '/langs').subscribe((allLanguages: ILanguage[]) => {
this._languages = allLanguages;
observer.next(this._languages);
observer.complete();
});
}
});
}
if the method call occurs with a delay - everything works correctly, the request is sent once
But if the method is called, almost simultaneously - there are several requests
Tried to put the boot flag, which also did not work
public list(): Observable<ILanguage[]> {
return Observable.create((observer: Observer<ILanguage[]>) => {
if (this._languages && this._languages.length > 0) {
observer.next(this._languages);
observer.complete();
} else if (!this._requestIsSend) {
this._requestIsSend = true;
this._http.get<ILanguage[]>(this._constants.apiUrl + '/langs').subscribe((allLanguages: ILanguage[]) => {
this._languages = allLanguages;
observer.next(this._languages);
observer.complete();
});
}
});
}
Upvotes: 1
Views: 2177
Reputation: 3649
We have 2 problem with sharing Observable from a function:
- each function call return new Observable (memoize the function)
- Observable is cold (convert Observable to hot)
import * as _memoize from 'lodash/memoize';
import { shareReplay } from 'rxjs/operators';
export function ShareReplayObservable(target, key, descriptor) {
const originalMethod = descriptor.value;
const getObservableFn = function (...args) {
return originalMethod.apply(this, args).pipe(shareReplay());
};
descriptor.value = _memoize(getObservableFn);
return descriptor;
}
@ShareReplayObservable
public list(): Observable<ILanguage[]> {
return this._http.get<ILanguage[]>(this._constants.apiUrl + '/langs');
}
Upvotes: 0
Reputation: 14199
Your question is a bit cryptic, but I think what you are asking is how you can prevent the repeated calling of your HTTP API. You are probably subscribing multiple times on your list()
observable before the first HTTP call finishes. Your flag only prevents API calls after the first call has finished, causing unwanted simultaneous calls.
But worry not. This caching requirements is already built-in into RxJS with ReplaySubject
, so you don't need to implement it on your own. ReplaySubject
takes a single parameter, the number of last values that should be emitted to all subscribers on subscribe. Simply use 1 as parameter value and it will cache the latest value of your observable (which I guess are allLanguages)
Your code can be simplified to:
private languages$: Observable;
public list(): Observable<ILanguage[]> {
if (!this.languages$) {
this.languages$ = new ReplaySubject(1);
this._http.get<ILanguage[]>(this._constants.apiUrl + '/langs').subscribe((allLanguages: ILanguage[]) => {
this.languages$.next(allLanguages);
});
}
return this.languages$;
}
There is even a cleaner and less imperative solution that involves pipes. There is a pipe for sharing the observable emissions with a multicast subject. For caching, you can use the shareReplay(1)
pipe, that will behave exactly like a ReplaySubject(1)
. Thereby you create a connectable observable (which is probably not necessary for you). To create an ordinary observable from it, simple pipe it to refCount()
. The result is, like above, an Observables that re-emits the last cached value to late subscribers.
private languages$: Observable;
public list(): Observable<ILanguage[]> {
if (!this.languages$)
this.languages$ =
this._http.get<ILanguage[]>(this._constants.apiUrl + '/langs')
.pipe(shareReplay(1))
.pipe(refCount());
return this.languages$;
}
Upvotes: 3