Ravid Goldenberg
Ravid Goldenberg

Reputation: 2299

RxJS cache and refresh with shareReplay

I am using a caching for some data retrieved from an API, for logical reasons the stored data is valid only for a limited time, so I am making use of something like:

someApiData$ = this.getData()
    .pipe(shareReplay(1, 3000))

What seems to be obvious to me but apparently is not to the creator of the shareReplay operator is that if the data is no longer cached it should be re-fetched, or at the very least I should have another parameter that will give me this option, something like:

someApiData$ = this.getData()
    .pipe(shareReplay(1, 3000, shouldRefresh))

Instead, what the next subscriber will get is null. So, I am looking for an elegant solution to this issue.

Upvotes: 16

Views: 14637

Answers (8)

Rohan
Rohan

Reputation: 21

As of RxJS v7 the share function which underlies shareReplay has picked up some nice abilities. For some reason most are not exposed via shareReplay.

To cache the result of an API call for 15 seconds use

source.pipe(
  share({
    connector: () => new ReplaySubject(),
    resetOnComplete: () => timer(15 * 1000),
    resetOnRefCountZero: false,
  })
)

This relies on the the API response Observable immediately completing, which is certainly true for Angular's HttpClient.

Upvotes: 2

igorsp7
igorsp7

Reputation: 451

I think the best solution for this use case is the following custom 'dataCache' operator function, which will cache the data for a specified time and then re-fetch it and update its subscribers, if there are any.

export function dataCache(validForMs: number = 3000000) {
  return function<T>(source: Observable<T>) {
    return source.pipe(
      expand(() => timer(validForMs).pipe(switchMap(() => source))),
      shareReplay(1)
    );
  }
}

It can be used in some business logic service as follows. In this example, the data will be re-fetched every 10 seconds:

@Injectable({
  providedIn: 'root'
})
export class MyDataService {

  constructor(private http: HttpClient) { }

  private cache$: Observable<MyData> | undefined = undefined;
  getData(): Observable<MyData>
  {
    this.cache$ ??= this.http.get<MyData>(url.api('my-data')).pipe(dataCache(10000));
    return this.cache$;
  }

}

Upvotes: 0

vidalsasoon
vidalsasoon

Reputation: 4401

Old question but I had the same bug as you. realized I just needed to fallback on api call if "api data" was null. new code in bold.

someApiData$ = **someApiData$ ||** this.getData().pipe(shareReplay(1, 3000))

Upvotes: 0

Ravid Goldenberg
Ravid Goldenberg

Reputation: 2299

After some trails with the answers on this thread and some other approaches on the web, this is what I ended up with. It gives the ability to:

  1. Cache values
  2. Refresh the values automatically if the data is no longer cached
  3. Work directly with the Observable
  4. Specify the duration of the cache lifetime if needed
  5. Unclutter my services and provide a reusable solution

My caching util:

export class SharedReplayRefresh {

    private sharedReplay$: Observable<T>;
    private subscriptionTime: number;


   sharedReplayTimerRefresh(
       source: Observable<T>, bufferSize: number = 1,
       windowTime: number = 3000000,  scheduler?: SchedulerLike): Observable<T> {

        const currentTime = new Date().getTime();
        if (!this.sharedReplay$ || 
            currentTime - this.subscriptionTime > windowTime) {
            this.sharedReplay$ = source.pipe(shareReplay(
                bufferSize, windowTime, scheduler));
            this.subscriptionTime = currentTime;
        }

        return this.sharedReplay$;
    }
}

My data-service:

export class DataService {
    
    constructor(private httpClient: HttpClient) { }

    private dataSource = 
        new SharedReplayRefresh<Data>();
    private source = this.httpClient.get<Data>(url);
    
    get data$(): Observable<Data> {
        return this.dataSource .sharedReplayTimerRefresh(this.source, 1, 1500);
    }
}

Upvotes: 11

markfknight
markfknight

Reputation: 446

You can change how your Stream starts, in this case use an interval to create a stream that emits straight away then, when the interval is met, use this to trigger the data load.

When you first subscribe to the stream the interval is triggered, data loaded, then again three seconds later.

import { interval } from 'rxjs';

const interval$ = interval(3000); // emits straight away then every 3 seconds

When interval$ emits, use switchMap to switch the Observable out and shareReplay to allow multicasting.

// previous import
import { switchMap, shareReplay } from 'rxjs/operators';

// previous code

const data$ = interval$.pipe(
    switchMap(() => getData()),
    shareReplay()
);

You can also wrap the interval$ in merge so you can create a manual refresh based on a Subject as as your interval.

import { BehaviorSubject, merge, interval } from "rxjs";
import { shareReplay, switchMap } from "rxjs/operators";

const interval$ = interval(3000);
const reloadCacheSubject = new BehaviorSubject(null);

const data$ = merge(reloadCacheSubject, interval$).pipe(
  switchMap(() => getData()),
  shareReplay()
);

reloadCacheSubject.next(null); // causes a reload

StackBlitz example with merge and refreshCache Subject

Upvotes: 3

Ivan
Ivan

Reputation: 1366

I had a similar use-case and ended up using the following custom operator.

import { Observable } from 'rxjs';
import { tap } from 'rxjs/operators';

export const cacheValue = <T>(windowTime: (value: T) => number) => (
  source: Observable<T>,
) => {
  let cache: { value: T; expires: number } | undefined = undefined;
  return new Observable<T>((observer) => {
    if (cache && cache.expires > Date.now()) {
      observer.next(cache.value);
      observer.complete();
    } else {
      return source
        .pipe(
          tap(
            (value) =>
              (cache = { value, expires: Date.now() + windowTime(value) }),
          ),
        )
        .subscribe(observer);
    }
  });
};

If your cache expires in 100ms, you would call it as cacheValue(() => 100), and if the value returned by the API has an expiresIn property, you'd call it as cacheValue((value) => value.expiresIn).

Upvotes: 3

Andrei Gătej
Andrei Gătej

Reputation: 11934

Here would be one approach:

const URL = 'https://jsonplaceholder.typicode.com/todos/1';

const notifier = new Subject();
const pending = new BehaviorSubject(false);

const cacheEmpty = Symbol('cache empty')

const shared$ = notifier.pipe(
  withLatestFrom(pending),
  filter(([_, isPending]) => isPending === false),

  switchMap(() => (
    console.warn('[FETCHING DATA]'),
    pending.next(true),
    fetch(URL).then(r => r.json())
  )),

  tap(() => pending.next(false)),
  shareReplay(1, 1000),
);

const src$ = shared$.pipe(
  mergeWith(of(cacheEmpty).pipe(delay(0), takeUntil(shared$))),
  tap(v => v === cacheEmpty && notifier.next()),
  filter(v => v !== cacheEmpty)
)

src$.subscribe(v => console.log('[1]', v));

setTimeout(() => {
  src$.subscribe(v => console.log('[2]', v));
}, 500);

setTimeout(() => {
  src$.subscribe(v => console.log('[3]', v));
}, 1200);

StackBlitz.

mergeWith is import { merge as mergeWith } from 'rxjs/operators'(I think that as of RxJs 7, it will be accessible as mergeWith directly).

My reasoning was that I needed to find a way to determine whether the cache of the ReplaySubject in use is empty or not. It's known that if the cache is not empty and a new subscriber arrives, it will send the cached values synchronously.

So,

mergeWith(of(cacheEmpty).pipe(delay(0), takeUntil(shared$))),

is essentially the same as

merge(
  shared$,
  of(cacheEmpty).pipe(delay(0), takeUntil(shared$)) // #2
)

If there are values in the cache, shared$ will emit and #2 will be unsubscribed to.

If there are no values, #2 will emit and then complete(the fact that it completes won't affect the outer observable).

Next, we see that if cacheEmpty has been emitted, then we know that it's time to refresh the data.

tap(v => v === cacheEmpty && notifier.next()), // `notifier.next()` -> time to refresh
filter(v => v !== cacheEmpty)

Now, let's have a look at how notifier works

const shared$ = notifier.pipe(
  
  // These 2 operators + `pending` make sure that if 2 subscribers register one after the other, thus synchronously
  // the source won't be subscribed more than needed
  withLatestFrom(pending),
  filter(([_, isPending]) => isPending === false),

  switchMap(() => (
    console.warn('[FETCHING DATA]'),
    pending.next(true), // If a new subscriber registers while the request is pending, the source won't be requested twice
    fetch(URL).then(r => r.json())
  )),

  // The request has finished, we have the new data
  tap(() => pending.next(false)),

  shareReplay(1, 1000),
);

Upvotes: 0

255kb - Mockoon
255kb - Mockoon

Reputation: 6974

According to the documentation, the window parameter of the shareReplay operator is not working like this:

the age, in milliseconds, at which items in this buffer may be discarded without being emitted to subsequent observers

In your code sample, it means that after 3 seconds new Subscribers won't get anything.

I think the best way to handle this is to deal with it with an external counter:

  private cache$: Observable<any>;
  private lastTime: number;

  public getCachedData() {
    if (!this.cache$ || new Date().getTime() - this.lastTime > 3000) {
      this.cache$ = this.getData().pipe(shareReplay(1));
      this.lastTime = new Date().getTime();
    }

    return this.cache$;
  }

This code will "recreate" the Observable each time a new Subscriber call the getCachedData().

However, older Subscribers won't get the update of the new recreated Observable. To keep all of them in sync you may want to use a BehaviorSubject to store the data:

  // Everybody subscribe to this Subject
  private data$ = new BehaviorSubject(null);

  public getCachedData() {
    // TODO check time expiration here and call this.refreshData();
    if(timeExpired) {
      return this.refreshData().pipe(
        mergeMap(data => {
          return this.data$.asObservable();
        })
      );
    } else {
      return this.data$.asObservable();
    }
  }
  
  private refreshData() {
    return this.getData().pipe(
      tap(data => {
        this.data$.next(data);
      })
    );
  }

The above solution is just an idea and should be improved and tested.

Upvotes: 1

Related Questions