Reputation: 155
I have a observable which tracks changes to a form. When the form is updated it should autosave by sending an HTTP PUT request to the server. I want the behaviour so that when changes are made the first HTTP request is sent. For any subsequent changes no requests should be sent until the previous request has completed. When the previous request has completed I want to send only the most recent update.
I have thought about using exhaustMap e.g. formChanges$.pipe(exhaustMap(data => sendHttpUpdate(data)));
as this would give me the behaviour that would ignore all updates while a request was in progress. However, all data is lost while in progress so I would need to retrigger the formChanges$ observable to send the latest update or it would be ignored.
I also considered concatMap e.g. formChanges$.pipe(concatMap(data => sendHttpUpdate(data)));
as this waits for previous request to complete but sends all updates in order rather than just the lastest one.
I would like to use a pure rxjs solution i.e using Observables, Subjects and operators without setting state using a variable. Am I missing an obvious operator or combination of operators that could achieve this?
Here is my best attempt so far:
const $send = new Subject();
formChanges$.pipe(
sample($send),
switchMap(this.fakeHttpRequest),
)
.subscribe(handleResponse);
formChanges$.pipe(first())
.subscribe(() => $send.next());
where fakeHttpRequest is a function which takes the value formChanges$ and returns an Observable of the HTTP request.
This is unsatisfactory as I have to manually trigger the first send after the first form change and I feel it doesn't convey the intention of the code very well.
Upvotes: 3
Views: 1733
Reputation: 155
I can't believe it has taken me 9 months to figure this out but in case anyone else is tearing their hair out over this:
As I realised none of the map operators (concatMap, exhaustMap) do exactly what is required so you need to handle the mapping and then which values get subscribed to yourself. The solution is to map each form value into a HTTP request observable (not subscribed yet so no request). Then pass this observable as the parameter to throttle. Throttle takes an observable, which it subscribes to and ignores values from the original source until this completes. Throttle also takes a configuration parameter which allows you to define the behaviour of leading and trailing values - making it very flexible. For this circumstance we want both because we want to know the result of that first request and the result of that last buffered request.
From this we now we have an observable with all the source values we want i.e the request data. However, we want the response data! Simply subscribing again would end up in duplicate HTTP calls (which may have a different result). Fear not as this is where shareReplay comes to the rescue. If we pipe the original request observable through shareReplay then subsequent subscribes to the observable will now just get the original response rather making a new call. Since we have an observable of observables we are going to want to flatten this using the mergeAll operator. We can now subscribe to this to get the results of our HTTP requests.
Putting this all together:
formChanges$.pipe(
map(formValues => this.fakeHttpRequest(formValues).pipe(shareReplay(1)))
throttle(x => x, {leading: true, trailing: true}),
mergeAll(),
).subscribe((result) => console.log(result));
Using insider knowledge I assumed you cared about the result. If this wasn't true then this would be simpler. We could simply let throttle send the requests and we wouldn't have to worry about duplicate requests:
formChanges$.pipe(
map(formValues => this.fakeHttpRequest(formValues)))
throttle(x => x, {leading: true, trailing: true}),
).subscribe();
Upvotes: 2
Reputation: 374
hmm if i understand it something like this could work:
changeStarted: Subject<boolean>; // Keep track if save started;
changeComplete: Subject<boolean>; // Keep track if save finished;
constructor() {
this.changeStarted = Subject<boolean>();
this.changeComplete = Subject<boolean>();
formChanges$.pipe(
bufferToggle(this.changeStarted, () => this.firstCall ? interval(1) : this.changeComplete) // once change has started wait till change is completed.
).subscribe(x => {
this.changeStarted.next(true) //signal to start buffering.
// Make the http request
this.fakeHttpRequest.subscribe(() => this.changeComplete.next(true));
}
Upvotes: 0