Congobill
Congobill

Reputation: 83

DebounceTime after first value in RxJS

I need a specific behavior that I can't get with the RxJS operators. The closest would be to use DebounceTime only for values entered after the first one, but I can't find a way to do it. I have also tried with ThrottleTime but it is not exactly what I am looking for, since it launches intermediate calls, and I only want one at the beginning that is instantaneous, and another at the end, nothing else.

ThrottleTime

throttleTime(12 ticks, { leading: true, trailing: true })

source:             --0--1-----2--3----4--5-6---7------------8-------9---------
throttle interval:  --[~~~~~~~~~~~I~~~~~~~~~~~I~~~~~~~~~~~I~~~~~~~~~~~]--------
output:             --0-----------3-----------6-----------7-----------9--------


source_2:           --0--------1------------------2--------------3---4---------
throttle interval:  --[~~~~~~~~~~~I~~~~~~~~~~~]---[~~~~~~~~~~~]--[~~~~~~~~~~~I~
output_2:           --0-----------1---------------2--------------3-----------4-

DebounceTime

debounceTime(500)

source:             --0--1--------3------------4-5-6-7-8-9-10-11--13----------------
debounce_interval:  -----[~~~~~]--[~~~~~]--------------------------[~~~~~]----------
output:             -----------1--------3--------------------------------13---------

What I want

debounceTimeAfterFirst(500) (?)

source:             --0--1--------3------------4-5-6-7-8-9-10-11--13----------------
debounce_interval:  -----[~~~~~]--[~~~~~]--------------------------[~~~~~]----------
output:             --0--------1--3------------4-------------------------13---------

As you see, the debounce time is activated when a new value is entered. If the debounce time passes and any new value has been entered, it stops the listening the debounceTime action and waits to start a new one.

Edit: I forgot to comment that this must be integrated with NgRx’s Effects, so it must be a continuous stream that mustn't be completed. Terminating it would probably cause it to stop listening for dispatched actions.

Upvotes: 7

Views: 6079

Answers (4)

Ignacio
Ignacio

Reputation: 1

this is the operator i wrote:

export function throttleDebounce<T>(time: number): OperatorFunction<T, T> {
  return (source: Observable<T>): Observable<T> => {
    const debounced = source.pipe(debounceTime(time), share());

    return merge(source.pipe(throttle(() => debounced)), debounced).pipe(map((value) => value));
  };
}

Upvotes: 0

Ramon Blanquer
Ramon Blanquer

Reputation: 454

I would use a throttle combined with a debounceTime:

  • throttle: from Documentation Emit value on the leading edge of an interval, but suppress new values until durationSelector has completed.

  • debounceTime: from Documentation Discard emitted values that take less than the specified time between output.

I would use a throttle stream to get the raising edge (the first emission) and then the debounce stream would give us the falling edge.

const source = fromEvent(document.getElementsByTagName('input'), 'keyup').pipe(
  pluck('target', 'value')
);

const debounced = source.pipe(
  debounceTime(4000),
  map((v) => `[d] ${v}`)
);

const effect = merge(
  source.pipe(
    throttle((val) => debounced),
    map((v) => `[t] ${v}`)
  ),
  debounced
);

effect.subscribe(console.log);

See RxJS StackBlitz with the console open to see the values changing.

I prepared the setup to adapt it to NgRx which you mention. The effect I got working is:

@Injectable({ providedIn: 'root' })
export class FooEffects {
  switchLight$ = createEffect(() => {
    const source = this.actions$.pipe(
      ofType('[App] Switch Light'),
      pluck('onOrOff'),
      share()
    );
    const debounced = source.pipe(debounceTime(1000), share());
    return merge(source.pipe(throttle((val) => debounced)), debounced).pipe(
      map((onOrOff) => SetLightStatus({ onOrOff }))
    );
  });

  constructor(private actions$: Actions) {}
}

See NgRx StackBlitz with the proposed solution working in the context of an Angular NgRx application.

  • share: This operator prevents the downstream paths to simultaneously fetch the data from all the way up the chain, instead they grab it from the point where you place share.

I also tried to adapt @martin's connect() approach. But I don't know how @martin would "reset" the system so that after a long time if a new source value is emitted would not debounce it just in the same manner as you first run it, @martin, feel free to fork it and tweak it to make it work, I'm curious about your approach, which is very smart. I didn't know about connect().

@avicarpio give it a go on your application and let us know how it goes :)

Upvotes: 7

Mrk Sef
Mrk Sef

Reputation: 8022

Here's a custom operator that (as far s I can tell) does what you're after.

The two key insights here are:

  1. Use connect so that you can subscribe to the source twice, once to ignore emissions with exhaustMap and another to inspect and debounce emissions with switchMap
  2. Create an internal token so that you know when to exit without a debounced emission. (Insures that from your example above, the 4 is still emitted).
function throttleDebounceTime<T>(interval: number): MonoTypeOperatorFunction<T> {
  // Use this token's memory address as a nominal token
  const resetToken = {};

  return connect(s$ => s$.pipe(
    exhaustMap(a => s$.pipe(
      startWith(resetToken),
      switchMap(b => timer(interval).pipe(mapTo(b))),
      take(1),
      filter<T>(c => c !== resetToken),
      startWith(a)
    ))
  ));
}

example:

of(1,2,3,4).pipe(
  throttleDebounceTime(500)
).subscribe(console.log);

// 1 [...0.5s wait] 4

Upvotes: 0

martin
martin

Reputation: 96891

I think you could do it like the following, even though I can't think of any easier solution right now (I'm assuming you're using RxJS 7+ with connect() operator):

connect(shared$ => shared$.pipe(
  exhaustMap(value => merge(
    of(value),
    shared$.pipe(debounceTime(1000)),
  ).pipe(
    take(2),
  )),
)),

Live demo: https://stackblitz.com/edit/rxjs-qwoesj?devtoolsheight=60&file=index.ts

connect() will share the source Observable and lets you reuse it in its project function multiple times. I'm using it only to use the source Observable inside another chain.

exhaustMap() will ignore all next notifications until its inner Observable completes. In this case the inner Observable will immediately reemit the current value (of(value)) and then use debounceTime(). Any subsequent emission from source is ignored by exhaustMap() because the inner Observable hasn't completed yet but is also passed to debounceTime(). Then take(2) is used to complete the chain after debounceTime() emits and the whole process can repeat when source emits because exhaustMap() won't ignore the next notification (its inner Observable has completed).

Upvotes: 0

Related Questions