RRGT19
RRGT19

Reputation: 1677

Conditionally cancel all inner observables or switch to a new one that emits only one value

I'm developing an application using Angular and I'm trying to build a flow that involves multiple observables (emit multiple values) and HTTP requests (emit one value).

I want to open an InAppBrowser to let the user do a transaction (a payment) and read the callback, which is a new url that the browser will try to load. I need to subscribe to multiple events of the browser to get notified when there is an error, a new url to load, etc..

I would like to handle this RxJS combination correctly and cancel all the observables if some condition is met to guarantee performance and no potential memory leaks.

I have tried to follow:

  1. RxJS switchMap does not cancel inner merged observable
  2. Using SwitchMap() to Handle Canceling Previous Requests
  3. RxJS: switchMap inside merge doesn't work to make a conditional observable return

All the following observables and request are fake to avoid complexity.

Observables that emit one value:

getOrderId(): Observable<any> {
  return this.http.get<any>(
    'https://jsonplaceholder.typicode.com/posts/1'
  );
}

notifyBackend(url: string): Observable<any> {
  return this.http.get<any>(
    'https://jsonplaceholder.typicode.com/posts/1'
  ).pipe(delay(2000));
}

Observables that emit multiple values:

Each one will emit at any time, the type property will tell the event type and what had happened.

getMultipleOne(): Observable<any> {
  return of({type: 'exit'}).interval(3000);
}

getMultipleTwo(): Observable<any> {
  return of({type: 'loaderror', url: 'http://www.blabla.com'}).interval(2000);
}

getMultipleThree(): Observable<any> {
  // The url returned can be any, I'm waiting for mine.
  return of({type: 'loadstart', url: 'http://www.google.com'}).interval(1000);
}

The flow that I'm trying to accomplish is:

  1. Call getOrderId that emit one value and switch to another observable
  2. Listen to the three observable's multiple values at the same time. I will react on those events and do some business logic.
  3. If the type is exit, cancel all subscriptions and don't notify the caller. Means that the user just closed/canceled manually.
  4. If the type is loadstart, check the url to see if it's mine to switch to another HTTP request, or, cancel all.
  5. If it's mine url, call the backend to save the results and notify the caller to show a nice UI result.

I have tried:

initFlow() {
    return this.getOrderId() // Emit just one value
        .pipe(
            // Switch to another one that is the merge of 3 observables
            switchMap(() => {
                return merge(
                    getMultipleOne(),
                    getMultipleTwo(),
                    getMultipleThree()
                );
            }),
            // This tap will be executed every time the above observables emit something
            tap(event => console.log(event)),
            tap(event => {
                if (event.type === 'exit') {
                    // Cancel all previous observables if this happen?
                    // Since the flow is not completed, the caller must not be notified, I guess?
                }
            }),
            filter(event => event.type === 'loadstart'),
            pluck('url'),
            // If not meet, cancel all above without notifying the caller
            switchMap(url => this.isGoogle(url) ? EMPTY : of (url)),
            // Switch to another HTTP that emits one value or throw error to cancel all
            switchMap(url => {
                if (this.isMyPersonalWebSite(url)) {
                    return this.notifyBackend(url); // Emit just one value
                } else {
                    // Let the caller be notified by its error event to show UI
                    throw new Error('User transaction is failed');
                }
            }),
            // The previous HTTP just emit one value, this allow the caller to receive the completion event.
            // The caller does not need it, but, I guess is the right thing to happen in observables life.
            first(),
        );
}

The caller:

// This one will be unsubscribed later by the component when its onDestroy hook is called
this.myService.initFlow()
    .subscribe({
        next: res => console.log('next', res),
        error: e => console.log('error', e),
        complete: () => console.log('complete')
    });

My issues:

  1. In the TODO part, I don't know how to cancel all the previous observables that I have declared using merge.
  2. When I return EMPTY, I see nothing, neither the caller is receiving the complete event. Is this normal?

My goals:

  1. Cancel all the observables that emit multiple values if the condition is met.
  2. Notify the caller using its next event when notifyBackend() emits and using its error event when I throw the error manually in the code.

Any help or advice to avoid memory leaks and a way to handle this would be appreciated.

Upvotes: 2

Views: 2740

Answers (1)

martin
martin

Reputation: 96891

If you want to complete all merged Observables you'll need to complete merge() inside siwtchMap() (I didn't test this code):

switchMap(() => {
  return merge(
    getMultipleOne(),
    getMultipleTwo(),
    getMultipleThree(),
  ).pipe(
    takeWhile(event => event.type !== 'exit'),
  );
}),
tap(event => {
  if (event === 'loaderror') {
    throw new Error(); // Will be wrapped into `error` notificaiton.
  }
}),
switchMap(event => {
  // At this point `event.type` must be `loadstart`
  if (this.isGoogle(event.url)) {
    return EMPTY;
  }

  if (this.isMyPersonalWebSite(event.url)) {
    return this.notifyBackend(event.url); // Emit just one value
  } else {
    // Let the caller be notified by its error event to show UI
    throw new Error('User transaction is failed');
  }
}),
take(1), // You shouldn't even need to use this

Regarding your questions:

  • To cancel all merged Observables, you need to complete the merged chain inside switchMap() (I'm using takeWhile here).

  • In your chain, you're returning EMPTY but the chain ends with first() which means when this.isGoogle(url) === true the chain will complete and first() will emit an error. See this take(1) vs first().

Upvotes: 0

Related Questions