Aj1
Aj1

Reputation: 973

Angular Rxjs Observable Chain with ngrx store

I am using Angular 7 with ngrx store. The store contains app state, which I am subscribing to in app components on the OnInit. There are couple of variables in store, which are interchangeable (swapped with a button).

Here is my example code in a component.

this.appState.getGasStationSushi().pipe(switchMap((sushi) => {
  this.gsSushi = sushi;
  return this.appState.getStarbucksSushi();
}), switchMap((sushi) => {
  this.sbSushi = sushi;
  return this.service.compare(this.gsSushi, this.sbSushi);
}).subscribe((result)=>{
  console.log(result);
}));

With a button click in view, user can change both sushi values, which is causing the last subscription call twice, which makes sense (RxJS). I could remove switchMap and write something like

-- gas station sushi susbcription 
   -- star bucks sushi susbcription 
      -- compare

I am not really huge fan of this, I am sure there must be a rxjs/operator of sort. Could someone please make a suggestion?

Also, tried forkjoin, but with ngrx store it seems like, one need to call first or last something like below. Here is a reference for the above statement forkjoinWithstore

const $sushiObs = [
  this.appState.getGasStationSushi().pipe(first()),
  this.appState.getStarbucksSushi().pipe(first())
];
forkjoin($sushiObs).subscribe((result) => {
  console.log(result);
});

If I use the above pattern, the subscriptions fire for the first time but not after that at all.

Upvotes: 2

Views: 1755

Answers (1)

Marian
Marian

Reputation: 4079

First of all, here is a working example on stackblitz.

Instead of using a store I just created a mock class SushiState that returns observables.

class SushiState {
  private _gas = new BehaviorSubject(1);
  private _starbucks = new BehaviorSubject(1);

  public get gas() {
    return this._gas.asObservable();
  }
  public get starbucks() {
    return this._gas.asObservable();
  }

  public increaseSushi(n = 1) {
    this._gas.next(this._gas.value + n);
    this._starbucks.next(this._starbucks.value + n);
  }

  public static compareSushi(g: number, s: number): string {
    return `gas is ${g}, starbucks is ${s}`;
  }
}

As for the component, here is the code.

export class AppComponent implements OnInit {
  state: SushiState;

  gasSushi: Observable<number>;
  sbSushi: Observable<number>;

  combined: string;
  combinedTimes = 0;
  zipped: string;
  zippedTimes = 0;

  ngOnInit() {
    this.state = new SushiState;
    this.gasSushi = this.state.gas;
    this.sbSushi = this.state.gas;

    const combined = combineLatest(
      this.gasSushi,
      this.sbSushi,
    ).pipe(
      tap((sushi) => {
        console.log('combined', sushi);
        this.combinedTimes++;
      }),
      map((sushi) => SushiState.compareSushi(sushi[0], sushi[1])),
    );
    combined.subscribe(result => this.combined = result);

    const zipped = zip(
      this.gasSushi,
      this.sbSushi,
    ).pipe(
      tap((sushi) => {
        console.log('zipped', sushi);
        this.zippedTimes++;
      }),
      map((sushi) => SushiState.compareSushi(sushi[0], sushi[1])),
    );
    zipped.subscribe(result => this.zipped = result);
  }

  increaseSushi() {
    this.state.increaseSushi();
  }

}

If you run it on stackblitz and check the console, you will see the following behaviour:

console output

If we use combined latest, we combine the observables separately and only care about the latest state, resulting in 2 calls of console.log.

We could instead use zip, which waits for both observables to emit, before producing an output. This looks like the perfect fit for our "Increase Both" button, but there's a problem: if the starbucksSushi get incremented separately (maybe from a different part of the app), the zipped version will wait for the gas station sushi to get updated too.

To suggest a third solution, you could use combineLatest to combine the sushi counters, and then use debounceTime operator to wait for some number of milliseconds before emitting the output.

const debounced = zip(
  this.gasSushi,
  this.sbSushi,
).pipe(
  tap((sushi) => {
    console.log('debounced', sushi);
    this.debouncedTimes++;
  }),
  map((sushi) => SushiState.compareSushi(sushi[0], sushi[1])),
  debounceTime(100),
);
debounced.subscribe(result => this.debounced = result);

This will react to changes in all the sources, but not more often than 100ms.

Finally, the reason why you had to do first():

forkJoin joins the observables after they complete (which can happen only once, so it's not a good fit for a continuous stream) and is more suitable for "promise-like" work, e.g. HTTP calls, process completions, etc. Incidentally, if you only take one element from a stream, the resulting stream completes after the single emission.

P.S.

I recommend utilizing the async pipe for working with observables (like i've done with the properties

gasSushi: Observable<number>;
sbSushi: Observable<number>;

and inside the template then

<div>
  <h3>starbucks sushi</h3>
  <p>{{sbSushi | async}}</p>
</div>

instead of

result => this.zipped = result

I have used both in this example so you can compare them. From my experience, working with observables gets much easier, once you stop converting "de-observing" them ahead of time and just allow the async pipe to do its work.

On top of that, if you use subscribe somewhere in your component, you should unsubscribe when the component is destroyed - which is not hard at all, but if we never subscribe explicitly, and allow the async pipe to do the subscription, it also handles the destruction for us :)

Upvotes: 1

Related Questions