Ced
Ced

Reputation: 17397

Adding to previous result, Observable pipeline only running once

live example

I've an Array of Filters as an Observable and I'd like to add/remove filters from it. Here is the code I have that is currently only adding a Filter the first time the function runs.

The second time nothing happens.

private _filters$ = new BehaviorSubject<Filter[]>([]);

addFilter(added: Filter) {
    debugger
    // adding to array of filters
    this._filters$.pipe(
        tap(d => { debugger; }),
        first(), 
        map(filters => ([...filters, added]))
    ).subscribe(this._filters$);
}

So my question is: why does this happen ? Why does it run only once ? (By the way first() is not the reason).

I know I can make the code work like so:

private _filters$ = new BehaviorSubject<Filter[]>([]);

currentFilters;

init() {
   this._filters$.subscribe(f => this.currentFilters = f);
}

addFilter(added: Filter) {
    this._filters$.next([...this.currentFilters, added]);
}

Upvotes: 0

Views: 267

Answers (1)

paulpdaniels
paulpdaniels

Reputation: 18663

Actually, it is because of first. When you run the function the first time it is creating the stream and subscribing to the BehaviorSubject. When it receives the first event it forwards it to BehaviorSubject and then it completes BehaviorSubject. The second time you run it BehaviorSubject is already shutdown so it immediately unsubscribes any new subscriptions to it.

Without knowing too much about your actual goal my suggestion is that instead of putting the BehaviorSubject at the bottom of the pipeline you instead put it at the top.

// You don't actually need the caching behavior yet so just use a `Subject`
private _filters$ = new Subject<Filter>()

// Hook this up to whatever is going to be using these filters
private _pipeline$ = this._filters.pipe(
  // Use scan instead mapping back into self
  scan((filters, newFilter) => ([...filters, newFilter]), []),
  // Store the latest value for new subscribers
  shareReplay(1)
);

// Now this method is just pushing into the `Subject` and the pipeline never has to be torn down
addFilter(added: Filter) {
    debugger
    this._filters$.next(added);
}

Upvotes: 2

Related Questions