Reputation: 60357
I have a subject that subscribes to an event listener Observable (fromEvent
). When the button stop is clicked, the subject completes and the event listener should be removed. Currently, I'm keeping track of the subscription and unsubscribe it, which also removes the event listener:
const { fromEvent, Subject } = rxjs
const { map } = rxjs.operators
const subject = new Subject()
subject.subscribe(() => console.log('focused'))
const sub = fromEvent(document.querySelector('input'), 'focus').subscribe(subject)
document.querySelector('button').onclick = () => {
// This is what I would like to avoid
sub.unsubscribe;
subject.complete();
}
<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>
<input>
<button>Stop</button>
Is it possible to automatically unsubscribe the subscription sub
(and remove the event listener) when subject
completes? Something like takeUntilComplete(subject)
without installing an additional library.
Edit: The example here is oversimplified. In my real scenario, there is no button that lets the subject complete.
Upvotes: 2
Views: 603
Reputation: 29325
you're over thinking this a bit, this is all you need:
const stop$ = fromEvent(document.querySelector('button'), 'click');
fromEvent(document.querySelector('input'), 'focus').pipe(takeUntil(stop$)).subscribe(subject)
the completion from the takeUntil
will propagate down to the subscribed subject. All subscriptions and event listeners will end with a button click.
The only thing that WONT happen, is if the Subject completes some way OTHER than button click, then your input event will still fire, though the subscriber won't be listening anymore. Completion propagates downstream but not upstream.
Alternatively creating new operators is pretty easy (riffing off your solution):
const takeUntilComplete = (subject) =>
takeUntil(new Observable(o =>
subject.subscribe(null, null, () => o.next())))
which can be used as simply as:
fromEvent(document.querySelector('input'), 'focus').pipe(takeUntilComplete(subject)).subscribe(subject)
Upvotes: 3
Reputation: 60357
Not very elegant, but creating a new Observable which emits on complete of the subject works:
fromEvent(...).pipe(
takeUntil(
new Observable((subscriber) =>
this._trackedInteractions$?.subscribe({ complete: () => subscriber.next() })
)
)
).subscribe(subject)
Upvotes: 2
Reputation: 6414
Consider extending Subject
class to notify its unsubscribe/error/complete as below:
class NotifyStateSubject<T> extends Subject<T>{
public stopped = new Subject<void>();
private notify(): void {
this.stopped.next();
this.stopped.unsubscribe();
}
public error(err: any):void {
this.notify();
super.error(err)
}
public complete():void{
this.notify();
super.complete()
}
public unsubscribe():void {
this.notify();
super.unsubscribe();
}
}
then use it as following:
fromEvent(...).pipe(takeUntil(subject.stopped$)).subscribe(subject)
Upvotes: 1