Reputation: 7838
I'm using RxJS's .auditTime(500)
operation (docs) as a trailing throttle: I want to emit a server call at most once every 500 ms.
Downstream when the server call completes I need to inquire whether there are more pending server calls or if the buffer is clear for now so that I can communicate this information to users in the form of status messages like "Saving…" and "Saved".
Here's roughly what that looks like.
saveToServerObservable
.do(() => {
// gets called every time
setStatus(Status.SAVING);
})
.auditTime(500) // wait 500 ms and emit no more than once per 500 ms
.flatMap(data => axios({
method: "post",
url: "/saveurl",
data: data,
}))
.map(response => response.data)
.do(data => {
// here I want to know whether there are pending values from the
// auditTime() operation above or if the buffer is currently clear
const pendingSaves = ???;
if (!pendingSaves) {
setStatus(Status.SAVED);
}
})
.subscribe();
As you can see in the final .do()
operation, I want to know whether there are pending values from the .auditTime(500)
operation. How can I achieve something like this?
Cheers! 🙏
Upvotes: 2
Views: 724
Reputation: 7838
Incrementing and decrementing a counter was too susceptible to bugs, so I ended up going with a wholly different approach. I now separately track whether the local data is "dirty." I use this dirty signal to show a "Saving…" vs "Saved" message to the user:
dirty
to true
.dirty
to false
.dirty
to true
on each editHere I define an Rx.Subject
for each time the user makes an edit. Each time it receives a signal, I set dirty
to true
.
// stream of signals to save the active document
const userEditSignal$ = new Rx.Subject();
const savePrototype = () => {
userEditSignal$.next();
};
userEditSignal$.subscribe(() => {
// runs for each call to save the active document
store.commit("SET_DIRTY", true);
});
dirty
state to decide when to save to serverThis lets us know every time the dirty
value changes, which is not the same as every time it is set.
const observeState = (store, getter) => {
// irrelevant details redacted
}
// emits only when `dirty` changes, not every time it's set
const shouldSaveToServer$ = observeState(store, state => state.dirty);
This custom timing logic replaces the need for the auditTime()
operator.
const saveToServerSignal$ = shouldSaveToServer$.switchMap(shouldSave => {
return shouldSave ?
// as long as we should save, save every 500 ms
Rx.Observable.interval(500) :
// when we should not, stop
Rx.Observable.never();
});
// create a request object for each save-to-server signal
const saveRequest$ = saveToServerSignal$
.mapTo(store.state.activeDocument)
.map(createSaveRequest);
//
const saveResponse$ = saveRequest$
// sends immediately
.flatMap(request => axios(request));
If they agree, we can set dirty
to false
.
saveResponse$
.map(response => response.data)
.do(savedDocument => {
const activeDocument = store.state.activeDocument;
// update just `created`, `modified`, and `user`
store.commit({
type: "UPDATE_ACTIVE_DOCUMENT",
// irrelevant details omitted
});
// diff current state and saved document (function details omitted)
const activeAndSavedDocsMatch = diff(activeDocument, savedDocument);
if (activeAndSavedDocsMatch) {
store.commit("SET_DIRTY", false);
}
})
.subscribe();
Upvotes: 0
Reputation: 96969
I think you could achieve what you want using scan
and by slightly modifying your chain:
const inc = new Subject();
const dec = new Subject();
const counter = Observable.merge(dec.mapTo(-1), inc.throttleTime(500).mapTo(1))
.scan((acc, val) => acc + val, 0)
.map(val => val > 0);
saveToServerObservable
.do(() => {
// gets called every time
setStatus(Status.SAVING);
inc.next();
})
.auditTime(500) // wait 500 ms and emit no more than once per 500 ms
.flatMap(data => axios({
method: "post",
url: "/saveurl",
data: data,
}))
.do(() => dec.next())
.map(response => response.data)
.withLatestFrom(counter, (data, pendingSaves) => {
if (!pendingSaves) {
setStatus(Status.SAVED);
}
})
.subscribe();
The whole idea is in the counter
Observable that merges inc
and dec
. These two Observables increment and decrement a counter using scan()
.
The inc
is also chained with .throttleTime(500)
to make the exact opposite of .auditTime(500)
because when you call setStatus(Status.SAVING);
you always know this will make .auditTime(500)
emit an item, therefore you can increment the counter right away.
Then withLatestFrom
just merges the counter with the result of the remote call and that's the place where you can check what was the latest emission from counter
.
Upvotes: 1