Reputation: 51
I want to build a function that caches the last emitted value of a stream and reexecutes the creation of the stream when an trigger event is fired.
function cache<T>(
source$: () => Observable<T>,
trigger$: Observable<unknown>
): Observable<T> {
return trigger$.pipe(startWith(source$), switchMap(source$), shareReplay(1));
}
this works already pretty well the only problem is when i use it like this
trigger$ = interval(1_000).pipe(
takeUntil(this.destroy$.asObservable())
);
cached$ = cache(
() => of(Math.round(Math.random() * 1_000)),
this.trigger$
);
and trigger the destroy event and subscribe afterwards than i get the last value instead that the interval starts anew. If I use share() instead of shareReplay() than it works as i want, only that I don't get a stored value immediately on subscription. So why does shareReplay() behave this way, is it because of the refCount, and how can I make it work like I want to?
Here is an example https://stackblitz.com/edit/angular-ivy-ny39z3?file=src%2Fapp%2Fapp.component.ts. Just start a few streams that stop and than start new streams without reloading I think than it is clear what my problem is 😅. Thanks for your time and help.
Upvotes: 2
Views: 1050
Reputation: 51
Found a solution by looking at the actual implementation of the shareReplay
operator. If I change it with the following, it works:
share<T>({
connector: () => new ReplaySubject(1),
resetOnError: true,
resetOnComplete: true,
})
In the actual implementation, resetOnComplete
is set to false
.
Upvotes: 3
Reputation: 3399
Just saw your answer as I was posting mine. shareReplay
also has a refCount
option that's exactly what you need.
By default shareReplay will use refCount of false, meaning that it will not unsubscribe the source when the reference counter drops to zero, i.e. the inner ReplaySubject will not be unsubscribed (and potentially run for ever)
If refCount is true, the source will be unsubscribed from once the reference count drops to zero, i.e. the inner ReplaySubject will be unsubscribed. All new subscribers will receive value emissions from a new ReplaySubject which in turn will cause a new subscription to the source observable.
More reference https://rxjs.dev/api/operators/shareReplay
Upvotes: 0