David
David

Reputation: 51

RxJs: Why does my stream with shareReplay emit last value on new subscription when it was completed with takeUnitl operator?

I want to build a function that caches the last emitted value of a stream and reexecutes the creation of the stream when an trigger event is fired.

function cache<T>(
  source$: () => Observable<T>,
  trigger$: Observable<unknown>
): Observable<T> {
  return trigger$.pipe(startWith(source$), switchMap(source$), shareReplay(1));
}

this works already pretty well the only problem is when i use it like this

trigger$ = interval(1_000).pipe(
  takeUntil(this.destroy$.asObservable())
);

cached$ = cache(
  () => of(Math.round(Math.random() * 1_000)),
  this.trigger$
);

and trigger the destroy event and subscribe afterwards than i get the last value instead that the interval starts anew. If I use share() instead of shareReplay() than it works as i want, only that I don't get a stored value immediately on subscription. So why does shareReplay() behave this way, is it because of the refCount, and how can I make it work like I want to?

Here is an example https://stackblitz.com/edit/angular-ivy-ny39z3?file=src%2Fapp%2Fapp.component.ts. Just start a few streams that stop and than start new streams without reloading I think than it is clear what my problem is 😅. Thanks for your time and help.

Upvotes: 2

Views: 1050

Answers (2)

David
David

Reputation: 51

Found a solution by looking at the actual implementation of the shareReplay operator. If I change it with the following, it works:

share<T>({
      connector: () => new ReplaySubject(1),
      resetOnError: true,
      resetOnComplete: true,
    })

In the actual implementation, resetOnComplete is set to false.

Upvotes: 3

olivarra1
olivarra1

Reputation: 3399

Just saw your answer as I was posting mine. shareReplay also has a refCount option that's exactly what you need.

By default shareReplay will use refCount of false, meaning that it will not unsubscribe the source when the reference counter drops to zero, i.e. the inner ReplaySubject will not be unsubscribed (and potentially run for ever)

If refCount is true, the source will be unsubscribed from once the reference count drops to zero, i.e. the inner ReplaySubject will be unsubscribed. All new subscribers will receive value emissions from a new ReplaySubject which in turn will cause a new subscription to the source observable.

More reference https://rxjs.dev/api/operators/shareReplay

Upvotes: 0

Related Questions