ᴘᴀɴᴀʏɪᴏᴛɪs
ᴘᴀɴᴀʏɪᴏᴛɪs

Reputation: 7519

RxJS - Buffer unique values and emit when no other values have been emitted for x seconds?

A user is typing values in a form and an event is emitted every time a user edits a particular field, with the value being the field they edited.

For example a user typing 3 times into the description field, followed by two times in the name field, would look like

"description" => "description" => "description" => "name" => "name" => ...

I want to buffer unique values and emit them as an array when the user stops typing for x amount of seconds. A value may reappear again in a different buffer window.

Essentially this is to track which fields were updated when the user stopped typing and communicate with the server to save the edited values.


I have this so far which emits every 3000 ms, plus it doesn't prevent duplicates when buffering but instead we "deduplicate" the array afterwards.

   this.update$
      .bufferTime(3000)
      .filter(buffer => buffer.length > 0)
      .map(buffer => [...new Set(buffer)])
      .subscribe(x => console.log(x));

So it should listen until a value is emitted and then buffer unique values until no more values have been emitted for x seconds, then emit the buffer and repeat. How can one achieve this?

Upvotes: 0

Views: 2578

Answers (2)

Yoshi
Yoshi

Reputation: 54649

This could be an alternate version:

const { Observable } = Rx;

const log = (prefix) => (...args) => { console.log(prefix, ...args); };

const inputs = document.querySelectorAll('input');

const updates$ = Observable
  .fromEvent(inputs, 'input')
  .pluck('target', 'id')
;

// wait x ms after last update
const flush$ = updates$.debounceTime(3000);

const buffered$ = updates$
  // use `distinct` without `keySelector`, but reset with flush$
  .distinct(null, flush$)
  
  // flush the buffer using flush$ as `notifier`
  .buffer(flush$)
;

buffered$.subscribe(log('buffered$ =>'));
<script src="https://unpkg.com/@reactivex/rxjs@^5/dist/global/Rx.min.js"></script>

<div><input type="text" placeholder="foo" id="input.foo"></div>
<div><input type="text" placeholder="bar" id="input.bar"></div>
<div><input type="text" placeholder="baz" id="input.baz"></div>

Upvotes: 2

ᴘᴀɴᴀʏɪᴏᴛɪs
ᴘᴀɴᴀʏɪᴏᴛɪs

Reputation: 7519

Perhaps my question wasn't clear enough, anyhow I've managed to solve it like so: (in case it helps someone else)

To have the buffer emit only when the stream was silent for 3 seconds, I start a new timer every time a user types something (event emitted on update$), and use switchMap to cancel the previous one.

this.update$
  .buffer(this.update$.switchMap(x => Observable.timer(3000)))
  .filter(buffer => buffer.length > 0)
  .map(buffer => [...new Set(buffer)])
  .subscribe(console.log);

Then to get the buffer to be unique itself rather than having to manually deduplicate it, I had to create a custom operator uniqueBuffer.

this.update$
  .uniqueBuffer(this.update$.switchMap(x => Observable.timer(3000)))
  .filter(buffer => buffer.length > 0)
  .subscribe(console.log);

function uniqueBuffer(emitObservable) {
  return Observable.create(subscriber => {
    const source = this;
    const uniqueBuffer = new Set();

    const subscription = source.subscribe(value => {
      uniqueBuffer.add(value);
    });

    emitObservable.subscribe(emit => {
      subscriber.next([...uniqueBuffer]);
      uniqueBuffer.clear();
    })
  })
}

Observable.prototype.uniqueBuffer = uniqueBuffer;

Upvotes: 1

Related Questions