Med Karim Garali
Med Karim Garali

Reputation: 943

wait Until a second Observable emits

In Rxjs, there is the pipe takeUntil but there isn't a pipe wait Until, that makes the current observable waiting for a seconde Observable to emit.

My Final Goal is to make many Observable still waiting until my Observable init$ emits just one value, to continue their execution. So that my Observable init$ has to been executed once and before that my other observable have to wait until inits emits any value different from null.

In this simple exemple, I want to add a pipe to pipedSource doing wait Until init$ , So the source has to wait until init$ emits to emit its value.

import { interval, timer, Subject, combineLatest } from 'rxjs';
import { takeUntil, skipWhile, skipUntil, concatMap, map, take } from 'rxjs/operators';
import { BehaviorSubject } from 'rxjs';

const init$ = new BehaviorSubject(null);

const source = new BehaviorSubject(null);
const pipedSource = source
.pipe(
    skipWhile((res)=> res === null)
    //wait until init$ emits a non null value
)

//first subscription to source
pipedSource.subscribe(val => console.log(val));

source.next({profile:"me"});

//init emits once
setTimeout(()=>{
  init$.next(1);
},2000);

// a second subscription to source
setTimeout(()=>{
  pipedSource.subscribe(val => console.log(val));
},3000);

wanted result:

//after 2s of waiting
//first subscription returns "profile"
//after 3s
//second subscription returns "profile" 

Upvotes: 2

Views: 7967

Answers (2)

Kurt Hamilton
Kurt Hamilton

Reputation: 13515

You want to run a second observable when a first observable emits a non-null value. To do this, use concatMap or switchMap after skipWhile.

ngOnInit() {
  const init$ = new BehaviorSubject(null);

  const source = new BehaviorSubject({profile:"me"});
  const pipedSource = init$
  .pipe(
      skipWhile((res)=> res === null),
      concatMap(() => source)
  );

  pipedSource.subscribe(val => console.log('first', val));

  //init emits once
  setTimeout(()=>{
    init$.next(1);
  },2000);

  // a second subscription to source
  setTimeout(()=>{
    pipedSource.subscribe(val => console.log('second', val));
  }, 3000);
}

Here I am subscribing to the init$ observable first, waiting for it to emit a non-null value, and then switching to the source observable.

DEMO: https://stackblitz.com/edit/angular-p7kftd

Upvotes: 4

Picci
Picci

Reputation: 17752

If I understand right you question, I see 2 potential cases.

The first one is that your source Observable starts emitting its values independently from wait$. When wait$ emits, only then you start using the values emitted by source. This behavior can be implemented using the combineLatest function like this

//emits value every 500ms
const source$ = interval(500);

combineLatest(source$, wait$)
    .pipe(
        map(([s, v]) => s), // to filter out the value emitted by wait$
        take(5), // just to limit to 5 the values emitted
    )
    .subscribe(val => console.log(val));

setTimeout(() => {
    wait$.next(1);
}, 2000);

In this case what you see printed on the console is a sequence starting from 2 since wait$ emits after 2 seconds.

The second case is when you want source to start emitting its values only after wait$ has emitted. In this case you can use switchMap operator such as here

const wait_2$ = new Subject();

//emits value every 500ms
const source_2$ = interval(500);

wait_2$
    .pipe(
        switchMap(() => source_2$),
        take(5), // just to limit to 5 the values emitted
    )
    .subscribe(val => console.log(val));

setTimeout(() => {
    wait_2$.next(1);
}, 4000);

In this case, after 4 seconds, a sequence starting with 0 gets printed on the console.

Upvotes: 0

Related Questions