Reputation: 2037
I have the following setup, which, every 3 seconds would make a new HTTP request to a server.
getData(param1: string): Observable<any> {
return timer(0, 3000).pipe(
switchMap(() => this.http.get(param1))
);
}
If a given request takes more than 3 seconds, the switchMap()
(I think) will cancel it and fire off a new one.
Now, I want to make it so that if a request is taking more than 3 seconds it waits for it to complete before firing off another one. Just for context, the idea is that if there's performance issues with the requests, my front-end is not stuck firing off and cancelling requests too early.
I somewhat got this to work with the following:
currentObs: Observable<any>;
getData(param1: string): Observable<any> {
return timer(0, 3000).pipe(
throttle(_ => this.currentObs),
switchMap(() => {
this.currentObs = this.http.get(param1)
return this.currentObs;
})
);
}
This will keep track of the currentObs
which is the observable of the current HTTP request. It then passes it to a throttle()
method so that the values from timer()
that normally prompt new requests are ignored until the request (currentObs
) completes.
This seems to work but it's a bit awkward as I'd need to keep some of the state outside the pipe()
. It's also a bit confusing because the throttling is based on an event that happens after it. I've been looking for a way to pass the result of the switchMap()
onto the throttle()
but first I didn't find one, and second, wouldn't that cause the throttle()
to be in the wrong side of the pipe?
Is there a neater way to achieve this using RxJS?
With @Mrk Sef's answer for a more elegant solution and @kvetis' warning for handling errors, I ended up with the following pipe that will make a request, wait for 3 seconds after a success and then make another request. If the request fails, it's going to wait for 3 seconds and make another request. and then start from the top.
getData(param1: string): Observable<any> {
return this.http.get(param1).pipe(
repeatWhen(s => s.pipe(
delay(3000)
)),
retryWhen(s => s.pipe(
delay(3000)
))
);
}
Upvotes: 1
Views: 673
Reputation: 8062
Try to run this.http.get
every 3 seconds, if the previous call isn't done within 3 seconds, do nothing and try again 3 seconds later.
getData(param1: string): Observable<any> {
return timer(0, 3000).pipe(
exhaustMap(() => this.http.get(param1))
);
}
Whenever the previous call ends, wait 3 seconds and then make the call again
getData(param1: string): Observable<any> {
return this.http.get(param1).pipe(
repeatWhen(s => s.pipe(
delay(3000)
))
);
}
Repeat the call every 3 seconds, unless the call takes longer than 3 seconds in which case repeat the call as soon as the previous call ends.
This is closest to what you described. It works by using a silent timer to artificially "extend" the HTTP call. This works because merge
won't complete until both inner observables complete. This means the fastest the merge will complete is 3 seconds.
getData(param1: string): Observable<any> {
return merge(
this.http.get(param1),
timer(3000).pipe(
filter(_ => false)
)
).pipe(
repeatWhen(s => s)
);
}
Upvotes: 1
Reputation: 7351
Your solution is a very elegant solution there is. You could get your hands more dirty stepping outside the world of observables and keeping the state outside in a simple callback are whatever. But I would say you solved the issue correctly.
Only be careful, that if the request fails, then the whole timer fails. You need to
recover in both switchMap
and currentObs
, if you want to continue with next request even if the previous fails. Since throttle
needs to receive a value for the pipe to continue, you should not just recover to EMTPY
. Let's emit null.
getData(param1: string): Observable<any> {
return timer(0, 3000).pipe(
throttle(_ => this.currentObs),
switchMap(() => {
this.currentObs = this.http.get(param1).pipe(
catchError(e => {
console.error(e);
return of(null); // so the throttle continues with next value
return this.currentObs;
}),
filter(identity) // use identity from RxJS so we filter out the null
);
}
Generally speaking, what you're trying to achieve is called backpressure. You can google "RxJS backpressure" and come up with different techniques. In most of them, you cannot achieve what you want without having an external Observable to feed info back to the source Observable.
Upvotes: 1