Franck
Franck

Reputation: 23

Wait for observables to complete before calling next one

I am using RxJS the following way and I get unexpected values order. The following code can be executed in Stackblitz.

EDIT - Simplified code. We could simplify to 2 simple observers to the same behaviorsubject that receive data in different order.

this.state$ = new BehaviorSubject<any>({ a: null, b: null });

const obs1 = this.state$
  .subscribe((value) => {
    console.log('obs 1', value);
    if (value.a == 1) this.state$.next({ a: 2, b: 3 });
  });

const obs2 = this.state$.subscribe((value) => console.log('obs 2', value));

this.state$.next({ a: 1, b: 1 });

This is what the logs look like

obs 1 {a: null, b: null}
obs 2 {a: null, b: null}
obs 1 {a: 1, b: 1}
obs 1 {a: 2, b: 3}
obs 2 {a: 2, b: 3}
obs 2 {a: 1, b: 1}

I would expect the observers of the same behavior subject to receive emitted values in the same order, wherever these values come from.

Previous code

class sampleState {
  a: any;
  b: any;
}

const initialState: sampleState = {
  a: null,
  b: null,
};

@Component({
  selector: 'my-app',
  standalone: true,
  imports: [CommonModule],
  template: ``,
})
export class App {
  name = 'Angular';

  protected _state$: BehaviorSubject<sampleState>;
  state$: Observable<sampleState>;

  constructor() {
    this._state$ = new BehaviorSubject<sampleState>(initialState);
    this.state$ = this._state$.asObservable();

    this.select((state) => state.a).subscribe((value) => {
      console.log('a ' + value);
      if (value == 1) this.setState({ b: 3 });
    });

    this.select((state) => state.b).subscribe((value) => {
      console.log('b ' + value);
    });

    this._state$.subscribe((value) => console.log(value));

    this.setState({ a: 1, b: 1 });
  }

  public get state() {
    return this._state$.getValue();
  }

  public setState(newState: any) {
    this._state$.next({
      ...this.state,
      ...newState,
    });
  }

  public select<K>(mapFn: (state: sampleState) => K): Observable<K> {
    return this._state$.asObservable().pipe(
        map((state: sampleState) => mapFn(state)),
       distinctUntilChanged()
    );
  }
}

I get the following logs

{a: null, b: null}
{a: 1, b: 3}
{a: 1, b: 1}

while I would expect

{a: null, b: null} // initialisation
{a: 1, b: 1} // setState
{a: 1, b: 3} // if a == 1 => set b = 3

It appears the first setState is not completed before calling the next one. Is there a way to change this behavior so the states change sequentially?

Upvotes: 2

Views: 421

Answers (2)

Picci
Picci

Reputation: 17762

To complement the answer from @BizzyBob this is what happens.

There are 3 consumers of the stream represented by this.state$ (or this._state$ for that matter) and these are the 3 subscriptions in the code.

Any time this._state$ notifies something via next all 3 subscriptions react, in the sequence they have been registered, i.e. first this.select((state) => state.a).subscribe(..., then the this.select((state) => state.a).subscribe(... and eventually this.state$.subscribe((value) => console.log('the object', value));.

The last one though is the only to write on the console, hence when it receives the first notification it writes {a: null, b: null}.

The first notification (this.select((state) => state.a).subscribe(...) had previously done its job of notifying the new value {a: 1, b: 3} which is then the second value that this.state$.subscribe((value) => console.log('the object', value)); writes {a: 1, b: 3} on the console.

Eventually, in this synchronous order, comes this.setState({ a: 1, b: 1 }); which causes this.state$.subscribe((value) => console.log('the object', value)); to write {a: 1, b: 1}.

So, the whole thing is synchronous, but, if you look at the code, it goes first down to this.state$.subscribe((value) => console.log('the object', value));, then up to the first subscription that fires a new notification and then to the bottom where the last notification occurs.

Upvotes: 1

BizzyBob
BizzyBob

Reputation: 14740

You're seeing this behavior because all of the operations are synchronous and you are emitting a new value from within your "select a" subscription, which causes the subscribe logic to be executed recursively.

To prevent the recursive behavior, you can utilize the asapScheduler like this:

this.state$ = this._state$.pipe(observeOn(asapScheduler));

Now, update your select method to use this.state$ instead of this._state$ and you should be good to go.

Here's a StackBlitz you can check out.

Upvotes: 2

Related Questions