Reputation: 2299
I am using a caching for some data retrieved from an API, for logical reasons the stored data is valid only for a limited time, so I am making use of something like:
someApiData$ = this.getData()
.pipe(shareReplay(1, 3000))
What seems to be obvious to me but apparently is not to the creator of the shareReplay
operator is that if the data is no longer cached it should be re-fetched, or at the very least I should have another parameter that will give me this option, something like:
someApiData$ = this.getData()
.pipe(shareReplay(1, 3000, shouldRefresh))
Instead, what the next subscriber will get is null. So, I am looking for an elegant solution to this issue.
Upvotes: 16
Views: 14637
Reputation: 21
As of RxJS v7 the share
function which underlies shareReplay
has picked up some nice abilities. For some reason most are not exposed via shareReplay
.
To cache the result of an API call for 15 seconds use
source.pipe(
share({
connector: () => new ReplaySubject(),
resetOnComplete: () => timer(15 * 1000),
resetOnRefCountZero: false,
})
)
This relies on the the API response Observable
immediately completing, which is certainly true for Angular's HttpClient
.
Upvotes: 2
Reputation: 451
I think the best solution for this use case is the following custom 'dataCache' operator function, which will cache the data for a specified time and then re-fetch it and update its subscribers, if there are any.
export function dataCache(validForMs: number = 3000000) {
return function<T>(source: Observable<T>) {
return source.pipe(
expand(() => timer(validForMs).pipe(switchMap(() => source))),
shareReplay(1)
);
}
}
It can be used in some business logic service as follows. In this example, the data will be re-fetched every 10 seconds:
@Injectable({
providedIn: 'root'
})
export class MyDataService {
constructor(private http: HttpClient) { }
private cache$: Observable<MyData> | undefined = undefined;
getData(): Observable<MyData>
{
this.cache$ ??= this.http.get<MyData>(url.api('my-data')).pipe(dataCache(10000));
return this.cache$;
}
}
Upvotes: 0
Reputation: 4401
Old question but I had the same bug as you. realized I just needed to fallback on api call if "api data" was null. new code in bold.
someApiData$ = **someApiData$ ||** this.getData().pipe(shareReplay(1, 3000))
Upvotes: 0
Reputation: 2299
After some trails with the answers on this thread and some other approaches on the web, this is what I ended up with. It gives the ability to:
Observable
My caching util:
export class SharedReplayRefresh {
private sharedReplay$: Observable<T>;
private subscriptionTime: number;
sharedReplayTimerRefresh(
source: Observable<T>, bufferSize: number = 1,
windowTime: number = 3000000, scheduler?: SchedulerLike): Observable<T> {
const currentTime = new Date().getTime();
if (!this.sharedReplay$ ||
currentTime - this.subscriptionTime > windowTime) {
this.sharedReplay$ = source.pipe(shareReplay(
bufferSize, windowTime, scheduler));
this.subscriptionTime = currentTime;
}
return this.sharedReplay$;
}
}
My data-service:
export class DataService {
constructor(private httpClient: HttpClient) { }
private dataSource =
new SharedReplayRefresh<Data>();
private source = this.httpClient.get<Data>(url);
get data$(): Observable<Data> {
return this.dataSource .sharedReplayTimerRefresh(this.source, 1, 1500);
}
}
Upvotes: 11
Reputation: 446
You can change how your Stream starts, in this case use an interval
to create a stream that emits straight away then, when the interval is met, use this to trigger the data load.
When you first subscribe to the stream the interval is triggered, data loaded, then again three seconds later.
import { interval } from 'rxjs';
const interval$ = interval(3000); // emits straight away then every 3 seconds
When interval$
emits, use switchMap
to switch the Observable
out and shareReplay
to allow multicasting.
// previous import
import { switchMap, shareReplay } from 'rxjs/operators';
// previous code
const data$ = interval$.pipe(
switchMap(() => getData()),
shareReplay()
);
You can also wrap the interval$ in merge
so you can create a manual refresh based on a Subject
as as your interval
.
import { BehaviorSubject, merge, interval } from "rxjs";
import { shareReplay, switchMap } from "rxjs/operators";
const interval$ = interval(3000);
const reloadCacheSubject = new BehaviorSubject(null);
const data$ = merge(reloadCacheSubject, interval$).pipe(
switchMap(() => getData()),
shareReplay()
);
reloadCacheSubject.next(null); // causes a reload
StackBlitz example with merge
and refreshCache
Subject
Upvotes: 3
Reputation: 1366
I had a similar use-case and ended up using the following custom operator.
import { Observable } from 'rxjs';
import { tap } from 'rxjs/operators';
export const cacheValue = <T>(windowTime: (value: T) => number) => (
source: Observable<T>,
) => {
let cache: { value: T; expires: number } | undefined = undefined;
return new Observable<T>((observer) => {
if (cache && cache.expires > Date.now()) {
observer.next(cache.value);
observer.complete();
} else {
return source
.pipe(
tap(
(value) =>
(cache = { value, expires: Date.now() + windowTime(value) }),
),
)
.subscribe(observer);
}
});
};
If your cache expires in 100ms, you would call it as cacheValue(() => 100)
, and if the value returned by the API has an expiresIn
property, you'd call it as cacheValue((value) => value.expiresIn)
.
Upvotes: 3
Reputation: 11934
Here would be one approach:
const URL = 'https://jsonplaceholder.typicode.com/todos/1';
const notifier = new Subject();
const pending = new BehaviorSubject(false);
const cacheEmpty = Symbol('cache empty')
const shared$ = notifier.pipe(
withLatestFrom(pending),
filter(([_, isPending]) => isPending === false),
switchMap(() => (
console.warn('[FETCHING DATA]'),
pending.next(true),
fetch(URL).then(r => r.json())
)),
tap(() => pending.next(false)),
shareReplay(1, 1000),
);
const src$ = shared$.pipe(
mergeWith(of(cacheEmpty).pipe(delay(0), takeUntil(shared$))),
tap(v => v === cacheEmpty && notifier.next()),
filter(v => v !== cacheEmpty)
)
src$.subscribe(v => console.log('[1]', v));
setTimeout(() => {
src$.subscribe(v => console.log('[2]', v));
}, 500);
setTimeout(() => {
src$.subscribe(v => console.log('[3]', v));
}, 1200);
mergeWith
is import { merge as mergeWith } from 'rxjs/operators'
(I think that as of RxJs 7, it will be accessible as mergeWith
directly).
My reasoning was that I needed to find a way to determine whether the cache of the ReplaySubject
in use is empty or not. It's known that if the cache is not empty and a new subscriber arrives, it will send the cached values synchronously.
So,
mergeWith(of(cacheEmpty).pipe(delay(0), takeUntil(shared$))),
is essentially the same as
merge(
shared$,
of(cacheEmpty).pipe(delay(0), takeUntil(shared$)) // #2
)
If there are values in the cache, shared$
will emit and #2
will be unsubscribed to.
If there are no values, #2
will emit and then complete(the fact that it completes won't affect the outer observable).
Next, we see that if cacheEmpty
has been emitted, then we know that it's time to refresh the data.
tap(v => v === cacheEmpty && notifier.next()), // `notifier.next()` -> time to refresh
filter(v => v !== cacheEmpty)
Now, let's have a look at how notifier
works
const shared$ = notifier.pipe(
// These 2 operators + `pending` make sure that if 2 subscribers register one after the other, thus synchronously
// the source won't be subscribed more than needed
withLatestFrom(pending),
filter(([_, isPending]) => isPending === false),
switchMap(() => (
console.warn('[FETCHING DATA]'),
pending.next(true), // If a new subscriber registers while the request is pending, the source won't be requested twice
fetch(URL).then(r => r.json())
)),
// The request has finished, we have the new data
tap(() => pending.next(false)),
shareReplay(1, 1000),
);
Upvotes: 0
Reputation: 6974
According to the documentation, the window
parameter of the shareReplay
operator is not working like this:
the age, in milliseconds, at which items in this buffer may be discarded without being emitted to subsequent observers
In your code sample, it means that after 3 seconds new Subscribers won't get anything.
I think the best way to handle this is to deal with it with an external counter:
private cache$: Observable<any>;
private lastTime: number;
public getCachedData() {
if (!this.cache$ || new Date().getTime() - this.lastTime > 3000) {
this.cache$ = this.getData().pipe(shareReplay(1));
this.lastTime = new Date().getTime();
}
return this.cache$;
}
This code will "recreate" the Observable each time a new Subscriber call the getCachedData()
.
However, older Subscribers won't get the update of the new recreated Observable. To keep all of them in sync you may want to use a BehaviorSubject
to store the data:
// Everybody subscribe to this Subject
private data$ = new BehaviorSubject(null);
public getCachedData() {
// TODO check time expiration here and call this.refreshData();
if(timeExpired) {
return this.refreshData().pipe(
mergeMap(data => {
return this.data$.asObservable();
})
);
} else {
return this.data$.asObservable();
}
}
private refreshData() {
return this.getData().pipe(
tap(data => {
this.data$.next(data);
})
);
}
The above solution is just an idea and should be improved and tested.
Upvotes: 1