Reputation: 2641
I have two functions, one returns observableA
and one observableB
. I want the getter as well as the subscription of observableB
to delay until observableA
is complete (but it can also be the case then when I’m subscribing to observableB
, observableA
is already completed).
I’ve already tried to use pipe
and skipUntil
but unfortunately this only prevents the execution and doesn’t delays it if observableA
is not completed yet.
functionA() {
this.observableA$ = getObservableA()
this.observableA$.subscribe(_ => {
// A: This line should execute *before* line B
})
}
functionB() {
this.observableB$ = getObservableB() // This getter should execute *after* line A
this.observableB$.subscribe(_ => {
// B: This line should execute *after* line A
})
}
// Two functions are called independently
functionA()
functionB()
Would be great to find a very RxJS-ish way :)
Update 1: The problem with concat
:
As noted both functions are called independently which would result in a double-execution of observableA$
when using concat
like suggested. Also the subscription in functionB
would be executed twice what I don't want.
functionA() {
this.observableA$ = getObservableA()
this.observableA$.subscribe(_ => {
// A: This line should execute *before* line B
})
}
functionB() {
Observable.concat(
this.observableA$, // I get executed once again :(
Observable.defer(() => getObservableB()) // To prevent earlier execution
).subscribe(() => {
// B: This line should execute *after* line A
console.log("I'm logged twice, once for each observable :(")
})
}
// Two functions are called independently
functionA()
functionB()
Update 2: @Wilhelm Olejnik solved it by using an additional BehaviorSubject
Upvotes: 3
Views: 2934
Reputation: 1690
I faced a similar situation in which I created a service SyncService
to synchronize the observables. For me, ObsA
and ObsB
were coming from separate components. ComponentA
got some initialization data that was needed by ComponentB
after it loaded its own data.
In ComponentA
I subscribed to ObsA
(returned from getInitData()
) and called into the sync service:
public initialize() {
this.apiSvc.getInitData().subscribe((initData) => {
this.data = initData;
this.syncService.setA(initData);
}
}
Then, in ComponentB
I subscribe to ObsB
(return from getBData()
), and then subscribe to the sync service:
public loadBData() {
this.apiSvc.getBData().subscribe((dataB) => {
this.syncService.setB(dataB).subscribe((dataA, dataB) => {
this.doStuffWithAAndB(dataA, dataB);
}
}
}
And finally, the sync service looks like this:
@Injectable
export class SyncService {
private dataA: DataA = null;
private dataB: DataB = null;
private gotAEvent: new EventEmitter<DataA>();
public setA(dataA: DataA) {
this.dataA = dataA;
if (this.dataB != null) {
// ObsB was already resolved!
this.gotAEvent.emit(dataA);
}
}
public setB(dataB: DataB) {
this.dataB = dataB;
if (this.dataA != null) {
return of({this.dataA, this.dataB});
} else {
return this.gotAEvent().map((dataA: DataA) => {
return {dataA, dataB};
}
}
}
}
Upvotes: 1
Reputation: 2507
If I understand everything correctly you want to delay getObservableB
execution until some observable will be assigned to property observableA$
.
Maybe it is doable with some Proxy
trick but I think it's easier to change observableA$
into null-initialized BehaviorSubject
. Then you can observe observableA$
and create observableB$
when non-null signal will be emitted.
https://stackblitz.com/edit/rxjs-mm2edy
import { of, BehaviorSubject, timer } from 'rxjs';
import { filter, switchMap, mapTo, tap } from 'rxjs/operators';
const getObservableA = () => timer(100).pipe(
tap(() => console.log('getObservableA')),
mapTo('A')
);
const getObservableB = () => timer(100).pipe(
tap(() => console.log('getObservableB')),
mapTo('B')
);
class Test {
// init observableA$ as BehaviorSubject with null state
observableA$ = new BehaviorSubject(null);
observableB$;
functionA() {
getObservableA().subscribe(val => {
console.log(val)
this.observableA$.next(val); // notify subscribers that observableA$ is ready
});
}
functionB() {
this.observableB$ = this.observableA$.pipe(
filter(value => value !== null), // ignore observableA$ until initalized
switchMap(value => getObservableB())
)
this.observableB$.subscribe(console.log)
}
}
const test = new Test();
test.functionB();
setTimeout(() => test.functionA(), 500);
// getObservableA
// A
// getObservableB
// B
Upvotes: 2