Reputation: 17397
I've an Array of Filters as an Observable
and I'd like to add/remove filters from it. Here is the code I have that is currently only adding a Filter
the first time the function runs.
The second time nothing happens.
private _filters$ = new BehaviorSubject<Filter[]>([]);
addFilter(added: Filter) {
debugger
// adding to array of filters
this._filters$.pipe(
tap(d => { debugger; }),
first(),
map(filters => ([...filters, added]))
).subscribe(this._filters$);
}
So my question is: why does this happen ? Why does it run only once ? (By the way first()
is not the reason).
I know I can make the code work like so:
private _filters$ = new BehaviorSubject<Filter[]>([]);
currentFilters;
init() {
this._filters$.subscribe(f => this.currentFilters = f);
}
addFilter(added: Filter) {
this._filters$.next([...this.currentFilters, added]);
}
Upvotes: 0
Views: 267
Reputation: 18663
Actually, it is because of first
. When you run the function the first time it is creating the stream and subscribing to the BehaviorSubject
. When it receives the first event it forwards it to BehaviorSubject
and then it completes BehaviorSubject
. The second time you run it BehaviorSubject
is already shutdown so it immediately unsubscribes any new subscriptions to it.
Without knowing too much about your actual goal my suggestion is that instead of putting the BehaviorSubject
at the bottom of the pipeline you instead put it at the top.
// You don't actually need the caching behavior yet so just use a `Subject`
private _filters$ = new Subject<Filter>()
// Hook this up to whatever is going to be using these filters
private _pipeline$ = this._filters.pipe(
// Use scan instead mapping back into self
scan((filters, newFilter) => ([...filters, newFilter]), []),
// Store the latest value for new subscribers
shareReplay(1)
);
// Now this method is just pushing into the `Subject` and the pipeline never has to be torn down
addFilter(added: Filter) {
debugger
this._filters$.next(added);
}
Upvotes: 2