A. K. Tolentino
A. K. Tolentino

Reputation: 2222

RxJS - How to share the output of an expensive observable but rerun that observable if its requested again after N seconds?

Let's say we have this global const:

const isSignedIn = fromPromise(fetch('/api/is-signed-in'))
    .pipe(throttleTime(1000), shareReply(1));

After page load, several components will subscribe to this at the same time:

isSignedIn.subscribe(() => console.log('do 1st'));
isSignedIn.subscribe(() => console.log('do 2nd'));
isSignedIn.subscribe(() => console.log('do 3rd'));

The above will only call the API once, however i need it to call the API again (ie after 1 second) if another component subscribes to it.

isSignedIn.subscribe(() => console.log('button press'));

How do i that using RxJS?

Upvotes: 6

Views: 1768

Answers (3)

Simon_Weaver
Simon_Weaver

Reputation: 145970

I think this is what you want:

A pipeable operator (declare globally somewhere and import it)

 export const refreshAfter = (duration: number) => (source: Observable<any>) =>
                             source.pipe(
                                   repeatWhen(obs => obs.pipe(delay(duration))),
                                   publishReplay(1), 
                                   refCount());

Then use it like this:

data$ = fetch('/api/is-signed-in').pipe(refreshAfter(5000));   // refresh after 5000 ms

Note: You actually asked for this:

i need it to call the API again (ie after 1 second) if another component subscribes to it.

Not quite sure this is what you really meant. I think what you really meant was - you want the data to be refreshed for all components currently subscribed after an expiry time. Anyway my answer sends the new value to all listeners. If you really want what you originally said you'd need to add some kind of alternative repeat trigger.

But if this is for a global constant - the above is what I'm using for the same scenario.

Note: I haven't actually tested the handling of an error condition when the item is repested, but I think the error will propagate to all listeners.

Upvotes: 1

Buggy
Buggy

Reputation: 3649

EDITED: The answer is wrong. BufferSize is how long the last N events are replayed. After this the stream is completed.

signature: shareReplay(
  bufferSize?: number,
  windowTime?: number,
  scheduler?: IIScheduler
):Observable

@param {Number} [bufferSize=Number.POSITIVE_INFINITY] Maximum element count of the replay buffer.
@param {Number} [windowTime=Number.MAX_VALUE] Maximum time length of the replay buffer in milliseconds.

Try to add 1000 as second argument to shareReply:

const isSignedIn = fromPromise(fetch('/api/is-signed-in'))
    .pipe(throttleTime(1000), shareReplay(1, 1000));

shareReplay.ts - be care of refCount-- on unsubcribe as it can trigger additional requests.

Upvotes: 0

Buggy
Buggy

Reputation: 3649

If we reimplement ShareReplay so it:
- will never unsubscribe from source even if it have no more subscribers (remove refCount, potential memory leak).
- accept rerunAfter argument, time passed from last subscribe to source.

import {Subject, of, Observable, ReplaySubject, Subscriber} from 'rxjs';
import {pluck, shareReplay, tap, delay} from 'rxjs/operators';

function shareForeverReplayRerun<T>(bufferSize: number, rerunAfter: number) {
  let subject;
  let subscription;
  let hasError = false;
  let isComplete = false;
  let lastSubTime = 0;

  return source => Observable.create((observer: Subscriber<T>) => {
    if (!subject || hasError || (Date.now() - lastSubTime) >= rerunAfter) {

      lastSubTime = Date.now();
      hasError = false;
      subject = new ReplaySubject<T>(bufferSize);
      subscription = source.subscribe({
        next(value) { subject.next(value); },
        error(err) {
          hasError = true;
          subject.error(err);
        },
        complete() {
          isComplete = true;
          subject.complete();
        },
      });
    }

    const innerSub = subject.subscribe(observer);
    // never unsubscribe from source
    return () => {
      innerSub.unsubscribe();
    };
  })
}





const source = of('Initial').pipe(
  tap(()=>console.log('COMPUTE')),
  delay(200),
  shareReplayRerun(1, 1000),
);

source.subscribe(console.log.bind(null, 'syncI:'));
source.subscribe(console.log.bind(null, 'syncII:'));

setTimeout(()=>source.subscribe(console.log.bind(null, 'after500:')), 500);

setTimeout(()=>source.subscribe(console.log.bind(null, 'after900:')), 900);

setTimeout(()=>source.subscribe(console.log.bind(null, 'after1500:')), 1500);

as output we have:

COMPUTE
syncI:    Initial
syncII:   Initial
after500: Initial
after900: Initial
COMPUTE
after1500:Initial

Upvotes: 0

Related Questions