Dennis Zoma
Dennis Zoma

Reputation: 2641

RxJS Looking for something like "delayUntil"

I have two functions, one returns observableA and one observableB. I want the getter as well as the subscription of observableB to delay until observableA is complete (but it can also be the case then when I’m subscribing to observableB, observableA is already completed).

I’ve already tried to use pipe and skipUntil but unfortunately this only prevents the execution and doesn’t delays it if observableA is not completed yet.

functionA() {
  this.observableA$ = getObservableA()
  this.observableA$.subscribe(_ => {
    // A: This line should execute *before* line B
  })
}

functionB() {
  this.observableB$ = getObservableB()  // This getter should execute *after* line A
  this.observableB$.subscribe(_ => {
    // B: This line should execute *after* line A
  })
}

// Two functions are called independently
functionA()
functionB()

Would be great to find a very RxJS-ish way :)


Update 1: The problem with concat: As noted both functions are called independently which would result in a double-execution of observableA$ when using concat like suggested. Also the subscription in functionB would be executed twice what I don't want.

functionA() {
  this.observableA$ = getObservableA()
  this.observableA$.subscribe(_ => {
    // A: This line should execute *before* line B
  })
}

functionB() {
  Observable.concat(
    this.observableA$,  // I get executed once again :(
    Observable.defer(() => getObservableB())  // To prevent earlier execution
  ).subscribe(() => {
    // B: This line should execute *after* line A
    console.log("I'm logged twice, once for each observable :(")
  })
}

// Two functions are called independently
functionA()
functionB()

Update 2: @Wilhelm Olejnik solved it by using an additional BehaviorSubject

Upvotes: 3

Views: 2934

Answers (2)

djs
djs

Reputation: 1690

I faced a similar situation in which I created a service SyncService to synchronize the observables. For me, ObsA and ObsB were coming from separate components. ComponentA got some initialization data that was needed by ComponentB after it loaded its own data.

In ComponentA I subscribed to ObsA (returned from getInitData()) and called into the sync service:

public initialize() {
   this.apiSvc.getInitData().subscribe((initData) => {
      this.data = initData;
      this.syncService.setA(initData);
   }
}

Then, in ComponentB I subscribe to ObsB (return from getBData()), and then subscribe to the sync service:

public loadBData() {
    this.apiSvc.getBData().subscribe((dataB) => {
        this.syncService.setB(dataB).subscribe((dataA, dataB) => {
            this.doStuffWithAAndB(dataA, dataB);
        }
    }
}

And finally, the sync service looks like this:

@Injectable
export class SyncService {
    private dataA: DataA = null;
    private dataB: DataB = null;
    private gotAEvent: new EventEmitter<DataA>();

    public setA(dataA: DataA) {
        this.dataA = dataA;
        if (this.dataB != null) {
            // ObsB was already resolved!
            this.gotAEvent.emit(dataA);
        }
    }

    public setB(dataB: DataB) {
        this.dataB = dataB;
        if (this.dataA != null) {
            return of({this.dataA, this.dataB});
        } else {
            return this.gotAEvent().map((dataA: DataA) => {
                return {dataA, dataB};
            }
        }
    }
}

Upvotes: 1

Wilhelm Olejnik
Wilhelm Olejnik

Reputation: 2507

If I understand everything correctly you want to delay getObservableB execution until some observable will be assigned to property observableA$.

Maybe it is doable with some Proxy trick but I think it's easier to change observableA$ into null-initialized BehaviorSubject. Then you can observe observableA$ and create observableB$ when non-null signal will be emitted.

https://stackblitz.com/edit/rxjs-mm2edy

import { of, BehaviorSubject, timer } from 'rxjs';
import { filter, switchMap, mapTo, tap } from 'rxjs/operators';

const getObservableA = () => timer(100).pipe(
  tap(() => console.log('getObservableA')),
  mapTo('A')
);

const getObservableB = () => timer(100).pipe(
  tap(() => console.log('getObservableB')),
  mapTo('B')
);

class Test {
  // init observableA$ as BehaviorSubject with null state
  observableA$ = new BehaviorSubject(null);
  observableB$;


  functionA() {
    getObservableA().subscribe(val => {
      console.log(val)
      this.observableA$.next(val);      // notify subscribers that observableA$ is ready
    });
  }

  functionB() {
    this.observableB$ = this.observableA$.pipe(
      filter(value => value !== null),            // ignore observableA$ until initalized
      switchMap(value => getObservableB())
    )
    this.observableB$.subscribe(console.log)
  }
}

const test = new Test();

test.functionB();
setTimeout(() => test.functionA(), 500);

// getObservableA
// A
// getObservableB
// B

Upvotes: 2

Related Questions