Reputation: 14002
let's say for instance I have an observable emiting at x0 emitions per second (maybe 50, 60, ...) sometimes and sometimes it's just 1 or 2 emitions per second.
Now how can I Buffer those fast emitions and still handling slow ones.
What I tired:
BufferTime needs a time span, so even if one emition it will be bufferd, (plus BufferTime makes protractor tests timeout).
BufferCount(x) doesn't emit until all x emitions are received.
Upvotes: 1
Views: 719
Reputation: 1494
As bygrace mentioned, what you are looking for is debounce + buffer
In modern RXJS 6 and with ES6 Typescript to add type inference, I created a custom OperatorFunction to do that very straightforward, called bufferDebounce
.
type BufferDebounce = <T>(debounce: number) => OperatorFunction<T, T[]>;
const bufferDebounce: BufferDebounce = debounce => source =>
new Observable(observer =>
source.pipe(buffer(source.pipe(debounceTime(debounce)))).subscribe({
next(x) {
observer.next(x);
},
error(err) {
observer.error(err);
},
complete() {
observer.complete();
},
})
// [as many sources until no emit during 500ms]
source.pipe(bufferDebounce(500)).subscribe(console.log);
you can see a working example here https://stackblitz.com/edit/rxjs6-buffer-debounce
Hope this helps to you and any newcomers.
Upvotes: 0
Reputation: 5988
Sounds like you want something similar to debounce + buffer. The simplest implementation of that is to use a debounce of the stream to trigger emiting the buffer of that same stream. You may want to share the stream to prevent duplicate subscriptions. Here is a running example:
const source = new Rx.Observable.create((o) => {
let count = 0;
const emit = () => {
const timeout = Math.random() * 1000;
setTimeout(() => {
o.next(count++);
if (count < 20) {
emit();
} else {
o.complete();
}
}, timeout);
};
emit();
}).share();
const triggerBuffer = source.debounceTime(500);
source.buffer(triggerBuffer).subscribe((x) => { console.log(x); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.9/Rx.min.js"></script>
Note that the debounce does not have an upper limit in that it wont emit if it continues to receive values under the debounce time. Practically this shouldn't make a difference in your scenario but in other scenarios it theoretically could.
Upvotes: 1