Kim Kern
Kim Kern

Reputation: 60357

Unsubscribe when subscribed subject completes

I have a subject that subscribes to an event listener Observable (fromEvent). When the button stop is clicked, the subject completes and the event listener should be removed. Currently, I'm keeping track of the subscription and unsubscribe it, which also removes the event listener:

const { fromEvent, Subject } = rxjs
const { map } = rxjs.operators

const subject = new Subject()

subject.subscribe(() => console.log('focused'))

const sub = fromEvent(document.querySelector('input'), 'focus').subscribe(subject)

document.querySelector('button').onclick = () => {
  // This is what I would like to avoid
  sub.unsubscribe;
  subject.complete();
}
<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>
<input>
<button>Stop</button>

Is it possible to automatically unsubscribe the subscription sub (and remove the event listener) when subject completes? Something like takeUntilComplete(subject) without installing an additional library.

Edit: The example here is oversimplified. In my real scenario, there is no button that lets the subject complete.

Upvotes: 2

Views: 603

Answers (3)

bryan60
bryan60

Reputation: 29325

you're over thinking this a bit, this is all you need:

const stop$ = fromEvent(document.querySelector('button'), 'click');

fromEvent(document.querySelector('input'), 'focus').pipe(takeUntil(stop$)).subscribe(subject)

the completion from the takeUntil will propagate down to the subscribed subject. All subscriptions and event listeners will end with a button click.

The only thing that WONT happen, is if the Subject completes some way OTHER than button click, then your input event will still fire, though the subscriber won't be listening anymore. Completion propagates downstream but not upstream.

Alternatively creating new operators is pretty easy (riffing off your solution):

const takeUntilComplete = (subject) => 
  takeUntil(new Observable(o => 
    subject.subscribe(null, null, () => o.next())))

which can be used as simply as:

fromEvent(document.querySelector('input'), 'focus').pipe(takeUntilComplete(subject)).subscribe(subject)

Upvotes: 3

Kim Kern
Kim Kern

Reputation: 60357

Not very elegant, but creating a new Observable which emits on complete of the subject works:

fromEvent(...).pipe(
  takeUntil(
    new Observable((subscriber) =>
      this._trackedInteractions$?.subscribe({ complete: () => subscriber.next() })
    )
  )
).subscribe(subject)

Upvotes: 2

Rafi Henig
Rafi Henig

Reputation: 6414

Consider extending Subject class to notify its unsubscribe/error/complete as below:

class NotifyStateSubject<T> extends Subject<T>{
  public stopped = new Subject<void>();

  private notify(): void {
    this.stopped.next();
    this.stopped.unsubscribe();
  }

  public error(err: any):void {
    this.notify();
    super.error(err)
  }

  public complete():void{
    this.notify();
    super.complete()
  }

 public unsubscribe():void {
    this.notify();
    super.unsubscribe();
  }
}

then use it as following:

fromEvent(...).pipe(takeUntil(subject.stopped$)).subscribe(subject)

Upvotes: 1

Related Questions