Reputation: 9396
I'm implementing an angular service that lets consumers observe various values based on their id:
The essence of it looks like this:
private subjects = new Map<number, Subject<any>>();
public subscribe(id: number, observer: any): Subscription {
// try getting subject for this id (or undefined if it does not yet exist)
let subj = this.subjects.get(id);
// create subject if it does not yet exist
if (!subj) {
subj = new Subject<any>();
this.subjects.set(id, subj);
}
// subscribe observer
const subscription = subj.subscribe(observer);
// set up teardown logic (gets called when subscription is unsubscribed)
subscription.add(() => {
// remove subject from the map, if this was the last subscription
if (subj.observers.length == 0) {
this.subjects.delete(id);
}
});
// return subscription
return subscription;
}
Here is the full stackblitz example
The above works fine but the API is a bit cumbersome to use (in the consumers I need to manually keep track of all the subscriptions and make sure to unsubscribe them properly).
I would prefer to have a method that returns an Observable
like this:
public subscribe(id: number): Observable<any> {
// TODO: Return an observable for this id and make sure that
// its corresponding subject is in the map iff at least one of the observables
// for this id has at least one subscription.
return ...;
}
Because this would allow me to subscribe to the values I need directly from the component templates using the async
pipe, where angular would take care of unsubscribing the observers.
But I can't quite figure out how I can implement the logic to remove unused Subject
s from the Map
when they are no longer used. Is there a good way to do that?
Here is an incomplete stackblitz examples with some test cases
Upvotes: 5
Views: 3681
Reputation: 11979
I think you could try something like this:
function subscribe(id: number): Observable<any> {
/* ... */
return sbj
.pipe(
finalize(() => {
if (subj.observers.length == 0) {
this.subjects.delete(id);
}
})
);
}
With this, you can also use the async pipe with the AnonymousSubject
returned by Subject.lift
(which is called as a result of Subject.pipe()
). AnonymousSubject
makes sure that the observers(e.g from the template) will be added to the ``AnonymousSubject's parent
Subject`'s list.
finalize()
is called when the source(e.g the Subject
) is unsubscribed. This can either happen when the component is destroyed, or when a complete
/error
event occurs, which also includes the case when the Subject
completes. When a Subject
completes, it will send a complete notification to all of its subscribers, meaning that the observers will eventually be automatically removed from the Subject
's observer list.
show1 = true;
show12 = true;
show2 = true;
v1$: Observable<any>;
v12$: Observable<any>;
v2$: Observable<any>;
constructor(public valueService: ValueService) {
}
async ngOnInit() {
await this.sleep(2000);
// const s11 = this.valueService.subscribe(1, v => this.v1 = v);
this.v1$ = this.valueService.subscribe(1);
await this.sleep(2000);
// const s21 = this.valueService.subscribe(2, v => this.v2 = v);
this.v2$ = this.valueService.subscribe(2);
await this.sleep(2000);
// const s12 = this.valueService.subscribe(1, () => {});
this.v12$ = this.valueService.subscribe(1);
await this.sleep(2000);
// s12.unsubscribe();
this.show12 = false
await this.sleep(2000);
// s11.unsubscribe();
this.show1 = false;
await this.sleep(2000);
// s21.unsubscribe();
this.show2 = false
}
<div *ngIf="show1">
v1: {{ v1$ | async }}
</div>
<div *ngIf="show12">
v12: {{ v12$ | async }}
</div>
<div *ngIf="show2">
v2: {{ v2$ | async }}
</div>
public subscribe(id: number): Observable<any> {
let subj = this.subjects.get(id);
if (!subj) {
subj = new Subject<any>();
this.subjects.set(id, subj);
}
return subj.pipe(
finalize(() => {
if (subj.observers.length === 1) {
this.subjects.delete(id);
}
})
)
}
As @ggradnig mentioned, the check should be subj.observers.length === 1
, since finalize()
,at least in RxJs 6.5.x
, runs its callback before any other unsubscriptions take place.
Upvotes: 4