Rachid O
Rachid O

Reputation: 14002

Rxjs buffer when source emitions rate is fast

let's say for instance I have an observable emiting at x0 emitions per second (maybe 50, 60, ...) sometimes and sometimes it's just 1 or 2 emitions per second.

Now how can I Buffer those fast emitions and still handling slow ones.

What I tired:

BufferTime needs a time span, so even if one emition it will be bufferd, (plus BufferTime makes protractor tests timeout).

BufferCount(x) doesn't emit until all x emitions are received.

Upvotes: 1

Views: 719

Answers (2)

Ignacio Bustos
Ignacio Bustos

Reputation: 1494

As bygrace mentioned, what you are looking for is debounce + buffer

In modern RXJS 6 and with ES6 Typescript to add type inference, I created a custom OperatorFunction to do that very straightforward, called bufferDebounce.

type BufferDebounce = <T>(debounce: number) => OperatorFunction<T, T[]>;

const bufferDebounce: BufferDebounce = debounce => source =>
  new Observable(observer => 
    source.pipe(buffer(source.pipe(debounceTime(debounce)))).subscribe({
      next(x) {
        observer.next(x);
      },
      error(err) {
        observer.error(err);
      },
      complete() {
        observer.complete();
      },
  })
// [as many sources until no emit during 500ms]
source.pipe(bufferDebounce(500)).subscribe(console.log);

you can see a working example here https://stackblitz.com/edit/rxjs6-buffer-debounce

Hope this helps to you and any newcomers.

Upvotes: 0

bygrace
bygrace

Reputation: 5988

Sounds like you want something similar to debounce + buffer. The simplest implementation of that is to use a debounce of the stream to trigger emiting the buffer of that same stream. You may want to share the stream to prevent duplicate subscriptions. Here is a running example:

const source = new Rx.Observable.create((o) => {
  let count = 0;
  const emit = () => {
    const timeout = Math.random() * 1000;
    setTimeout(() => {
      o.next(count++);
      if (count < 20) {
      	emit();
      } else {
        o.complete();
      }
    }, timeout);
  };
  emit();
}).share();

const triggerBuffer = source.debounceTime(500);
source.buffer(triggerBuffer).subscribe((x) => { console.log(x); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.9/Rx.min.js"></script>

Note that the debounce does not have an upper limit in that it wont emit if it continues to receive values under the debounce time. Practically this shouldn't make a difference in your scenario but in other scenarios it theoretically could.

Upvotes: 1

Related Questions