wbeeftink
wbeeftink

Reputation: 23

How to add a limit to buffer and debounce in RxJs subscription

I would like to achieve the following using RxJs:

  1. Group message that are within ~200ms of the previous message
  2. Emit group of messages when no new messages have been received within 250ms
  3. Emit group of messages when group reaches 10 items.

Thanks to several other questions on SO, such as this one, it's quite easy to implement 1 and 2 using a combination of buffer and debounceTime, like so:

const subject$ = new Subject<number>();

// Create the debounce
const notifier$ = subject$.pipe(
  debounceTime(250)
);

// Subscribe to the subject using buffer and debounce
subject$
  .pipe(
    buffer(notifier$)
  )
  .subscribe(value => console.log(value));

// Add a number to the subject every 200ms untill it reaches 10
interval(200)
  .pipe(
    takeWhile(value => value <= 10),
  )
  .subscribe(value => subject$.next(value));

Here messages are buffered as long as they're emitted within 200ms of the last one. If it takes more than 200ms a new buffer is started. However, if messages keep coming in under 200ms the messages could be buffered forever. That's why I want to add a hard limit on the buffer size.

I created an example at StackBlitz to demonstrate the buffer debounce. But I can't figure out how to limit the buffer so that it emits when it reaches 10 items as well.

Upvotes: 2

Views: 1024

Answers (2)

C&#233;drics
C&#233;drics

Reputation: 1994

We could create another notifier to limit the number of item (eg. with elementAt), use the notifier that emits first (with race) and apply this recursively (with expand):

const notifierDebouncing$ = subject$.pipe(
  debounceTime(PERIOD),
  take(1)
);

const notifierLimiting$ = subject$.pipe(
  elementAt(AMOUNT - 1)
);

const notifier$ = interval(0).pipe(
  take(1),
  expand(_ => race(notifierLimiting$, notifierDebouncing$))
);

subject$
  .pipe(buffer(notifier$))
  .subscribe(value => console.log(value));

What do you think?

Here is an example, based on your demo app: https://stackblitz.com/edit/rxjs-buffer-debounce-cf4qjy (open the console, then move the cursor for 2000ms and stop for 500ms)

Upvotes: 3

Can't you just filter it if its 10th item? Maybe i misunderstood your question.

    interval(this.interval)
      .pipe(
        filter(value => value % 10 === 0),
        takeWhile(value => value <= this.amount),
      )
      .subscribe(value => this.subject$.next(value));
  }

Upvotes: 0

Related Questions