Reputation: 943
In Rxjs, there is the pipe takeUntil but there isn't a pipe wait Until, that makes the current observable waiting for a seconde Observable to emit.
My Final Goal is to make many Observable still waiting until my Observable init$ emits just one value, to continue their execution. So that my Observable init$ has to been executed once and before that my other observable have to wait until inits emits any value different from null.
In this simple exemple, I want to add a pipe to pipedSource doing wait Until init$ , So the source has to wait until init$ emits to emit its value.
import { interval, timer, Subject, combineLatest } from 'rxjs';
import { takeUntil, skipWhile, skipUntil, concatMap, map, take } from 'rxjs/operators';
import { BehaviorSubject } from 'rxjs';
const init$ = new BehaviorSubject(null);
const source = new BehaviorSubject(null);
const pipedSource = source
.pipe(
skipWhile((res)=> res === null)
//wait until init$ emits a non null value
)
//first subscription to source
pipedSource.subscribe(val => console.log(val));
source.next({profile:"me"});
//init emits once
setTimeout(()=>{
init$.next(1);
},2000);
// a second subscription to source
setTimeout(()=>{
pipedSource.subscribe(val => console.log(val));
},3000);
wanted result:
//after 2s of waiting
//first subscription returns "profile"
//after 3s
//second subscription returns "profile"
Upvotes: 2
Views: 7967
Reputation: 13515
You want to run a second observable when a first observable emits a non-null value. To do this, use concatMap
or switchMap
after skipWhile
.
ngOnInit() {
const init$ = new BehaviorSubject(null);
const source = new BehaviorSubject({profile:"me"});
const pipedSource = init$
.pipe(
skipWhile((res)=> res === null),
concatMap(() => source)
);
pipedSource.subscribe(val => console.log('first', val));
//init emits once
setTimeout(()=>{
init$.next(1);
},2000);
// a second subscription to source
setTimeout(()=>{
pipedSource.subscribe(val => console.log('second', val));
}, 3000);
}
Here I am subscribing to the init$
observable first, waiting for it to emit a non-null value, and then switching to the source
observable.
DEMO: https://stackblitz.com/edit/angular-p7kftd
Upvotes: 4
Reputation: 17752
If I understand right you question, I see 2 potential cases.
The first one is that your source
Observable starts emitting its values independently from wait$
. When wait$
emits, only then you start using the values emitted by source
. This behavior can be implemented using the combineLatest
function like this
//emits value every 500ms
const source$ = interval(500);
combineLatest(source$, wait$)
.pipe(
map(([s, v]) => s), // to filter out the value emitted by wait$
take(5), // just to limit to 5 the values emitted
)
.subscribe(val => console.log(val));
setTimeout(() => {
wait$.next(1);
}, 2000);
In this case what you see printed on the console is a sequence starting from 2
since wait$
emits after 2 seconds.
The second case is when you want source
to start emitting its values only after wait$
has emitted. In this case you can use switchMap
operator such as here
const wait_2$ = new Subject();
//emits value every 500ms
const source_2$ = interval(500);
wait_2$
.pipe(
switchMap(() => source_2$),
take(5), // just to limit to 5 the values emitted
)
.subscribe(val => console.log(val));
setTimeout(() => {
wait_2$.next(1);
}, 4000);
In this case, after 4 seconds, a sequence starting with 0
gets printed on the console.
Upvotes: 0