Chris Calo
Chris Calo

Reputation: 7838

In RxJS is it possible to test for pending values in an auditTime() operation?

I'm using RxJS's .auditTime(500) operation (docs) as a trailing throttle: I want to emit a server call at most once every 500 ms.

Downstream when the server call completes I need to inquire whether there are more pending server calls or if the buffer is clear for now so that I can communicate this information to users in the form of status messages like "Saving…" and "Saved".

Here's roughly what that looks like.

saveToServerObservable
  .do(() => {
    // gets called every time
    setStatus(Status.SAVING);
  })
  .auditTime(500) // wait 500 ms and emit no more than once per 500 ms
  .flatMap(data => axios({
    method: "post",
    url: "/saveurl",
    data: data,
  }))
  .map(response => response.data)
  .do(data => {
    // here I want to know whether there are pending values from the
    // auditTime() operation above or if the buffer is currently clear
    const pendingSaves = ???;
    if (!pendingSaves) {
     setStatus(Status.SAVED);
    }
  })
  .subscribe();

As you can see in the final .do() operation, I want to know whether there are pending values from the .auditTime(500) operation. How can I achieve something like this?

Cheers! 🙏

Upvotes: 2

Views: 724

Answers (2)

Chris Calo
Chris Calo

Reputation: 7838

Incrementing and decrementing a counter was too susceptible to bugs, so I ended up going with a wholly different approach. I now separately track whether the local data is "dirty." I use this dirty signal to show a "Saving…" vs "Saved" message to the user:

  1. Every time the user makes a local edit, I locally update the data and set dirty to true.
  2. After each save operation, the server responds with the latest version of the data it has.
  3. Upon receiving that response, I diff the local version of the data with what gets returned by the server, and if they match I set dirty to false.

Setting dirty to true on each edit

Here I define an Rx.Subject for each time the user makes an edit. Each time it receives a signal, I set dirty to true.

// stream of signals to save the active document
const userEditSignal$ = new Rx.Subject();

const savePrototype = () => {
  userEditSignal$.next();
};

userEditSignal$.subscribe(() => {
  // runs for each call to save the active document
  store.commit("SET_DIRTY", true);
});

Observe the dirty state to decide when to save to server

This lets us know every time the dirty value changes, which is not the same as every time it is set.

const observeState = (store, getter) => {
  // irrelevant details redacted
}

// emits only when `dirty` changes, not every time it's set
const shouldSaveToServer$ = observeState(store, state => state.dirty);

Create a stream of request objects and server responses

This custom timing logic replaces the need for the auditTime() operator.

const saveToServerSignal$ = shouldSaveToServer$.switchMap(shouldSave => {
  return shouldSave ?
    // as long as we should save, save every 500 ms
    Rx.Observable.interval(500) :
    // when we should not, stop
    Rx.Observable.never();
});

// create a request object for each save-to-server signal
const saveRequest$ = saveToServerSignal$
  .mapTo(store.state.activeDocument)
  .map(createSaveRequest);

// 
const saveResponse$ = saveRequest$
  // sends immediately
  .flatMap(request => axios(request));

On every response, check diff the local document and the version returned from the server

If they agree, we can set dirty to false.

saveResponse$
  .map(response => response.data)
  .do(savedDocument => {
    const activeDocument = store.state.activeDocument;

    // update just `created`, `modified`, and `user`
    store.commit({
      type: "UPDATE_ACTIVE_DOCUMENT",
      // irrelevant details omitted
    });

    // diff current state and saved document (function details omitted)
    const activeAndSavedDocsMatch = diff(activeDocument, savedDocument);
    if (activeAndSavedDocsMatch) {
      store.commit("SET_DIRTY", false);
    }
  })
  .subscribe();

Upvotes: 0

martin
martin

Reputation: 96969

I think you could achieve what you want using scan and by slightly modifying your chain:

const inc = new Subject();
const dec = new Subject();

const counter = Observable.merge(dec.mapTo(-1), inc.throttleTime(500).mapTo(1))
    .scan((acc, val) => acc + val, 0)
    .map(val => val > 0);

saveToServerObservable
  .do(() => {
    // gets called every time
    setStatus(Status.SAVING);
    inc.next();
  })
  .auditTime(500) // wait 500 ms and emit no more than once per 500 ms
  .flatMap(data => axios({
    method: "post",
    url: "/saveurl",
    data: data,
  }))
  .do(() => dec.next())
  .map(response => response.data)
  .withLatestFrom(counter, (data, pendingSaves) => {
    if (!pendingSaves) {
     setStatus(Status.SAVED);
    }
  })
  .subscribe();

The whole idea is in the counter Observable that merges inc and dec. These two Observables increment and decrement a counter using scan().

The inc is also chained with .throttleTime(500) to make the exact opposite of .auditTime(500) because when you call setStatus(Status.SAVING); you always know this will make .auditTime(500) emit an item, therefore you can increment the counter right away.

Then withLatestFrom just merges the counter with the result of the remote call and that's the place where you can check what was the latest emission from counter.

Upvotes: 1

Related Questions