Reputation: 7519
A user is typing values in a form and an event is emitted every time a user edits a particular field, with the value being the field they edited.
For example a user typing 3 times into the description field, followed by two times in the name field, would look like
"description" => "description" => "description" => "name" => "name" => ...
I want to buffer unique values and emit them as an array when the user stops typing for x amount of seconds. A value may reappear again in a different buffer window.
Essentially this is to track which fields were updated when the user stopped typing and communicate with the server to save the edited values.
I have this so far which emits every 3000 ms, plus it doesn't prevent duplicates when buffering but instead we "deduplicate" the array afterwards.
this.update$
.bufferTime(3000)
.filter(buffer => buffer.length > 0)
.map(buffer => [...new Set(buffer)])
.subscribe(x => console.log(x));
So it should listen until a value is emitted and then buffer unique values until no more values have been emitted for x seconds, then emit the buffer and repeat. How can one achieve this?
Upvotes: 0
Views: 2578
Reputation: 54649
This could be an alternate version:
const { Observable } = Rx;
const log = (prefix) => (...args) => { console.log(prefix, ...args); };
const inputs = document.querySelectorAll('input');
const updates$ = Observable
.fromEvent(inputs, 'input')
.pluck('target', 'id')
;
// wait x ms after last update
const flush$ = updates$.debounceTime(3000);
const buffered$ = updates$
// use `distinct` without `keySelector`, but reset with flush$
.distinct(null, flush$)
// flush the buffer using flush$ as `notifier`
.buffer(flush$)
;
buffered$.subscribe(log('buffered$ =>'));
<script src="https://unpkg.com/@reactivex/rxjs@^5/dist/global/Rx.min.js"></script>
<div><input type="text" placeholder="foo" id="input.foo"></div>
<div><input type="text" placeholder="bar" id="input.bar"></div>
<div><input type="text" placeholder="baz" id="input.baz"></div>
Upvotes: 2
Reputation: 7519
Perhaps my question wasn't clear enough, anyhow I've managed to solve it like so: (in case it helps someone else)
To have the buffer emit only when the stream was silent for 3 seconds, I start a new timer every time a user types something (event emitted on update$
), and use switchMap
to cancel the previous one.
this.update$
.buffer(this.update$.switchMap(x => Observable.timer(3000)))
.filter(buffer => buffer.length > 0)
.map(buffer => [...new Set(buffer)])
.subscribe(console.log);
Then to get the buffer to be unique itself rather than having to manually deduplicate it, I had to create a custom operator uniqueBuffer
.
this.update$
.uniqueBuffer(this.update$.switchMap(x => Observable.timer(3000)))
.filter(buffer => buffer.length > 0)
.subscribe(console.log);
function uniqueBuffer(emitObservable) {
return Observable.create(subscriber => {
const source = this;
const uniqueBuffer = new Set();
const subscription = source.subscribe(value => {
uniqueBuffer.add(value);
});
emitObservable.subscribe(emit => {
subscriber.next([...uniqueBuffer]);
uniqueBuffer.clear();
})
})
}
Observable.prototype.uniqueBuffer = uniqueBuffer;
Upvotes: 1