Adrian Hristov
Adrian Hristov

Reputation: 2037

Throttle based on a previous Observable

I have the following setup, which, every 3 seconds would make a new HTTP request to a server.

  getData(param1: string): Observable<any> {
    return timer(0, 3000).pipe(
      switchMap(() => this.http.get(param1))
    );
  }

If a given request takes more than 3 seconds, the switchMap() (I think) will cancel it and fire off a new one.

Now, I want to make it so that if a request is taking more than 3 seconds it waits for it to complete before firing off another one. Just for context, the idea is that if there's performance issues with the requests, my front-end is not stuck firing off and cancelling requests too early.

I somewhat got this to work with the following:

currentObs: Observable<any>;

getData(param1: string): Observable<any> {
  return timer(0, 3000).pipe(
    throttle(_ => this.currentObs),
    switchMap(() => {
      this.currentObs = this.http.get(param1)
      return this.currentObs;
    })
  );
}

This will keep track of the currentObs which is the observable of the current HTTP request. It then passes it to a throttle() method so that the values from timer() that normally prompt new requests are ignored until the request (currentObs) completes.

This seems to work but it's a bit awkward as I'd need to keep some of the state outside the pipe(). It's also a bit confusing because the throttling is based on an event that happens after it. I've been looking for a way to pass the result of the switchMap() onto the throttle() but first I didn't find one, and second, wouldn't that cause the throttle() to be in the wrong side of the pipe?

Is there a neater way to achieve this using RxJS?

Edit:

With @Mrk Sef's answer for a more elegant solution and @kvetis' warning for handling errors, I ended up with the following pipe that will make a request, wait for 3 seconds after a success and then make another request. If the request fails, it's going to wait for 3 seconds and make another request. and then start from the top.

getData(param1: string): Observable<any> {
  return this.http.get(param1).pipe(
    repeatWhen(s => s.pipe(
      delay(3000)
    )),
    retryWhen(s => s.pipe(
      delay(3000)
    ))
  );
}

Upvotes: 1

Views: 673

Answers (2)

Mrk Sef
Mrk Sef

Reputation: 8062

ExhaustMap

Try to run this.http.get every 3 seconds, if the previous call isn't done within 3 seconds, do nothing and try again 3 seconds later.

getData(param1: string): Observable<any> {
  return timer(0, 3000).pipe(
    exhaustMap(() => this.http.get(param1))
  );
}

Repeat with Delay

Whenever the previous call ends, wait 3 seconds and then make the call again

getData(param1: string): Observable<any> {
  return this.http.get(param1).pipe(
    repeatWhen(s => s.pipe(
      delay(3000)
    ))
  );
}

Comparative Repeat

Repeat the call every 3 seconds, unless the call takes longer than 3 seconds in which case repeat the call as soon as the previous call ends.

This is closest to what you described. It works by using a silent timer to artificially "extend" the HTTP call. This works because merge won't complete until both inner observables complete. This means the fastest the merge will complete is 3 seconds.

getData(param1: string): Observable<any> {
  return merge(
    this.http.get(param1),
    timer(3000).pipe(
      filter(_ => false)
    )
  ).pipe(
    repeatWhen(s => s)
  );
}

Upvotes: 1

kvetis
kvetis

Reputation: 7351

Your solution is a very elegant solution there is. You could get your hands more dirty stepping outside the world of observables and keeping the state outside in a simple callback are whatever. But I would say you solved the issue correctly.

Only be careful, that if the request fails, then the whole timer fails. You need to recover in both switchMap and currentObs, if you want to continue with next request even if the previous fails. Since throttle needs to receive a value for the pipe to continue, you should not just recover to EMTPY. Let's emit null.

getData(param1: string): Observable<any> {
  return timer(0, 3000).pipe(
    throttle(_ => this.currentObs),
    switchMap(() => {
      this.currentObs = this.http.get(param1).pipe(
          catchError(e => {
              console.error(e);
              return of(null); // so the throttle continues with next value
      return this.currentObs;
    }),
    filter(identity) // use identity from RxJS so we filter out the null
  );
}

Generally speaking, what you're trying to achieve is called backpressure. You can google "RxJS backpressure" and come up with different techniques. In most of them, you cannot achieve what you want without having an external Observable to feed info back to the source Observable.

Upvotes: 1

Related Questions