knallfrosch
knallfrosch

Reputation: 366

RxJs limit amount of subscribe calls

I listen to ResizeObserver (or rather, a polyfill) to trigger the redrawing of a fabricjs canvas, which is quite resource-heavy. Therefore, I want to limit the amount of redraws that are triggered by the resize event.

I tried implementing this functionlity with RxJs:

  1. Redraw gets triggered immediately (first time)
  2. Redraw does not get triggered for n milliseconds
  3. Redraw does get triggered after n millisceonds
  4. Redraw does get triggered for last resize event

RxJs offers some built-in time-based operators. However, they all have their flaws:

I tried merging/joining these operators but this led to double-calling after n seconds and other problems. Is there a simple RxJs way to do this? I imagine it could be possible by starting/clearing a time-out function.

Upvotes: 2

Views: 1822

Answers (3)

Dorus
Dorus

Reputation: 7546

You could write your own function and keep track of the last emitted and received item in a local variable. To read the time in Rx you should use scheduler.now(), this keeps your code testable. With switchMap you can emulate throttle behavior by using switchMap(()=>of(e).pipe(delay(duration))). Using this, we can special case the situation where the last item was so long ago we want to emit the next one right away.

This resulted in the following solution:

function limitTime(duration, scheduler = async) {
  return (src) => new Observable(ob => {
      var last = scheduler.now() - duration;
      return src.pipe(
          switchMap(e => {
            var last2 = last;
            last = scheduler.now();
            if(last - last2 > duration) {
              return of(e);
            }
            return of(e).pipe(delay(duration + last2 - last, scheduler),
                             tap(() => last = scheduler.now()));
          })
        ).subscribe(ob);
    })
}

See it in action with some tests here.

Upvotes: 1

frido
frido

Reputation: 14139

You could use throttleTime with the option trailing to emit the last event. The last event will be emitted with the given delay and not right away when you stop resizing.

As you also want the first event to be emitted you additionally require the default option leading. Having both set to true will lead to two events being fired directly one after another at the end and beginning of each new time interval. To prevent this you could add debounceTime with some small timespan like 50ms.

import { fromEvent, asyncScheduler } from 'rxjs';
import { throttleTime, debounceTime, map } from 'rxjs/operators';

const source = fromEvent(window, 'resize').pipe(map(e => e.target['innerWidth']));
const width = source.pipe(
  throttleTime(1000, asyncScheduler, { leading: true, trailing: true }),
  debounceTime(50)
)

https://stackblitz.com/edit/typescript-qsjhvu

Upvotes: 1

pascalpuetz
pascalpuetz

Reputation: 5428

First that comes to mind is writing your own pipeable operator: https://github.com/ReactiveX/rxjs/blob/master/doc/pipeable-operators.md#build-your-own-operators-easily

Alternatively, you could try something like this:

const source$:Observable<MyEvent> = getEventObservable(); // Get it from fabric

merge(
    source$.pipe(throttle(100)), // max once in fixed interval
    source$.pipe(debounce(100)), // debounce to get the last value
).pipe(
   distinctUntilChanged() // In case both fire at the same time
).subscribe(val => {
    // Your code
})

Upvotes: 1

Related Questions