Rico Kahler
Rico Kahler

Reputation: 19302

RxJS Debounce with priority

I'm having trouble coming up with this stream.

What I'm looking for is something like debounceTime but with priority.

So if I have events with the shape { type: 'a', priority: 2 }. These events needs to be debounced by a few seconds but instead of the last event being emitted, the event with the highest priority is emitted.

input stream:
------(a|1)--(b|3)---(c|2)-----------------------(a|1)-----------------


output stream:
-----------------------------------(b|3)---------------------(a|1)-----

I've try looking at other operators like window and filtering through the result for the last event but it's not ideal because window work on a fixed cadence where I want the timer to start on the first event like debouncing does.

Upvotes: 6

Views: 495

Answers (3)

frido
frido

Reputation: 14139

You have to store and update the item with the highest priority and map to this highest value which you then pass to debounceTime.

let highest = null;
source$.pipe(
  map(v => highest = highest && highest.priority > v.priority ? highest : v),
  debounceTime(2000),
  tap(() => highest = null)
);

You can create your own operator that does this with the help of defer. defer makes sure that every subscriber gets its own highest variable, as every subscriber will get its own new Observable created by calling the given factory function.

function debounceTimeHighest<T>(dueTime: number, getHighest: (curr: T, high: T) => T): MonoTypeOperatorFunction<T> {
  return (source: Observable<T>) => defer(() => {
    let highest: T = null;
    return source.pipe(
      map(item => highest = highest ? getHighest(item, highest) : item),
      debounceTime(dueTime),
      tap(() => highest = null)
    );
  });
}

// Usage
source$.pipe(
  debounceTimeHighest(2000, (v1, v2) => v1.priority >= v2.priority ? v1 : v2)
)

The code above is Typescript. If you want plain Javascript just remove all the types.

https://stackblitz.com/edit/rxjs-hitqxk

Upvotes: 3

dmcgrandle
dmcgrandle

Reputation: 6070

I'll offer the following solution, based around using scan to offer up the highest given priority emission so far for consideration by debounceTime(). Note that scan needs to reconsider new data after every successful debounce, so I use the operator window() to split up the emissions, starting a new observable window after every emission by debounceTime().

Here is the CodeSandbox

And here is some simplified code from the CodeSandbox showing the important bits:

const resetScan$ = new Subject();

source$.pipe(
  window(resetScan$),
  mergeMap(win$ => win$.pipe(
    scan((acc, cur) => acc.priority >= cur.priority ? acc : cur )
  )),
  debounceTime(debounceDelay),
  tap(() => resetScan$.next())
);

Upvotes: 2

Ashish Patel
Ashish Patel

Reputation: 345

You can combine the debounceTime and buffer and filter operator to achieve what you need. I have developed this small example for it.

https://stackblitz.com/edit/typescript-lwzt4k

/*
Collect clicks that occur, after 250ms emit array of clicks
*/
clicks$.pipe(
  buffer(clicks$.pipe(debounceTime(1000))),
  // if array is greater than 1, double click occured
  map((clickArray) => {
    document.querySelector('#emittedObjects').innerHTML = (`<div>${JSON.stringify(clickArray)}</div>`); 
    const sortedArray = clickArray.sort((a, b) => {
      return a.priority < b.priority ? 1 : -1;
    });

    const output = sortedArray.length > 0 ? sortedArray[0] : null;
    document.querySelector('#mappedOutput').innerHTML = JSON.stringify(output);
    return output;
  })
)
.subscribe((obj) => {
  const str = obj ? JSON.stringify(obj) : 'NULL';
  document.querySelector('#throttledOutput').innerHTML = `<div>THROTTLED: ${str}</div>`;
});

Upvotes: 2

Related Questions