Robert Hegner
Robert Hegner

Reputation: 9396

Detect when a Subject has no more subscriptions

I'm implementing an angular service that lets consumers observe various values based on their id:

The essence of it looks like this:

private subjects = new Map<number, Subject<any>>();

public subscribe(id: number, observer: any): Subscription {
  // try getting subject for this id (or undefined if it does not yet exist)
  let subj = this.subjects.get(id);

  // create subject if it does not yet exist
  if (!subj) {
    subj = new Subject<any>();
    this.subjects.set(id, subj);
  }

  // subscribe observer
  const subscription = subj.subscribe(observer);

  // set up teardown logic (gets called when subscription is unsubscribed)
  subscription.add(() => { 
    // remove subject from the map, if this was the last subscription
    if (subj.observers.length == 0) {
      this.subjects.delete(id);
    }
  });

  // return subscription
  return subscription;
}

Here is the full stackblitz example

The above works fine but the API is a bit cumbersome to use (in the consumers I need to manually keep track of all the subscriptions and make sure to unsubscribe them properly).

I would prefer to have a method that returns an Observable like this:

public subscribe(id: number): Observable<any> {
  // TODO: Return an observable for this id and make sure that
  // its corresponding subject is in the map iff at least one of the observables
  // for this id has at least one subscription.
  
  return ...;
}

Because this would allow me to subscribe to the values I need directly from the component templates using the async pipe, where angular would take care of unsubscribing the observers.

But I can't quite figure out how I can implement the logic to remove unused Subjects from the Map when they are no longer used. Is there a good way to do that?

Here is an incomplete stackblitz examples with some test cases

Upvotes: 5

Views: 3681

Answers (1)

Andrei Gătej
Andrei Gătej

Reputation: 11979

I think you could try something like this:

function subscribe(id: number): Observable<any> {

  /* ... */

  return sbj
    .pipe(
      finalize(() => {
        if (subj.observers.length == 0) {
          this.subjects.delete(id);
        }
      })
    );
}

With this, you can also use the async pipe with the AnonymousSubject returned by Subject.lift(which is called as a result of Subject.pipe()). AnonymousSubject makes sure that the observers(e.g from the template) will be added to the ``AnonymousSubject's parent Subject`'s list.

finalize() is called when the source(e.g the Subject) is unsubscribed. This can either happen when the component is destroyed, or when a complete/error event occurs, which also includes the case when the Subject completes. When a Subject completes, it will send a complete notification to all of its subscribers, meaning that the observers will eventually be automatically removed from the Subject's observer list.

EDIT

app.component.ts

  show1 = true;
  show12 = true;
  show2 = true;

  v1$: Observable<any>;
  v12$: Observable<any>;
  v2$: Observable<any>;

  constructor(public valueService: ValueService) {
  }

  async ngOnInit() {
    await this.sleep(2000);
    // const s11 = this.valueService.subscribe(1, v => this.v1 = v);
    this.v1$ = this.valueService.subscribe(1);
    await this.sleep(2000);
    // const s21 = this.valueService.subscribe(2, v => this.v2 = v);
    this.v2$ = this.valueService.subscribe(2);
    await this.sleep(2000);
    // const s12 = this.valueService.subscribe(1, () => {});
    this.v12$ = this.valueService.subscribe(1);
    await this.sleep(2000);
    // s12.unsubscribe();
    this.show12 = false
    await this.sleep(2000);
    // s11.unsubscribe();
    this.show1 = false;
    await this.sleep(2000);
    // s21.unsubscribe();
    this.show2 = false
  }

app.component.html

<div *ngIf="show1">
  v1: {{ v1$ | async }}
</div>

<div *ngIf="show12">
  v12: {{ v12$ | async }}
</div>

<div *ngIf="show2">
  v2: {{ v2$ | async }}
</div>

value.service.ts

public subscribe(id: number): Observable<any> {
  let subj = this.subjects.get(id);

  if (!subj) {
    subj = new Subject<any>();
    this.subjects.set(id, subj);
  }

  return subj.pipe(
    finalize(() => {
      if (subj.observers.length === 1) {
        this.subjects.delete(id);
      }
    })
  )
}

StackBlitz.

As @ggradnig mentioned, the check should be subj.observers.length === 1, since finalize(),at least in RxJs 6.5.x, runs its callback before any other unsubscriptions take place.

Upvotes: 4

Related Questions