Phil
Phil

Reputation: 7566

rxjs - delay function return until a stream value meets a condition and is unsubscribed

I have a method that is triggering a server request. It has access to a stream from the redux-store and should execute a callback as soon as the result of the request is found in the mentioned stream. So basically:

private resultStream$: Observable<boolean>;
doFetch(args, callback) {
    store.dispatchHttpRequest(args);
    // this should only be executed when resultStream$ emits true
    callback();
}

How do I make callback(); to be called only when resultStream$ emits true AND I have unsubscribed from the stream, so there is no memory leak?

I have tried takeWhile(), putting the dispatchHttpRequest the setter of a conditional (for takeWhile) and the callback itself in the subscription() but it seems I am doing something wrong because I get bugs in my app. To be precise, the callback is only executed when doFetch() is called more than once, so the first request receives no callback-response.

Upvotes: 1

Views: 1395

Answers (2)

martin
martin

Reputation: 96891

You can use finally which is called when the chain is disposed. This means when all observers unsubscribed or when the chain completed/errored.

private resultStream$: Observable<boolean>;

doFetch(args, callback) {
    store.dispatchHttpRequest(args);

    return this.resultStream
      // be aware that `takeWhile` completes when the predicate function returns false
      .takeWhile(val => !Boolean(val) /* or whatever ... */)
      .finally(() => callback());
}

If you know the chain is going to complete you don't need to unsubscribe yourself.

Upvotes: 1

bryan60
bryan60

Reputation: 29335

try this:

private resultStream$: Observable<boolean>;
doFetch(args, callback) {
    this.resultStream$.filter(r => r).take(1).subscribe(e => callback());

    store.dispatchHttpRequest(args);
}

Upvotes: 2

Related Questions