Reputation: 83
I need a specific behavior that I can't get with the RxJS operators. The closest would be to use DebounceTime only for values entered after the first one, but I can't find a way to do it. I have also tried with ThrottleTime but it is not exactly what I am looking for, since it launches intermediate calls, and I only want one at the beginning that is instantaneous, and another at the end, nothing else.
ThrottleTime
throttleTime(12 ticks, { leading: true, trailing: true })
source: --0--1-----2--3----4--5-6---7------------8-------9---------
throttle interval: --[~~~~~~~~~~~I~~~~~~~~~~~I~~~~~~~~~~~I~~~~~~~~~~~]--------
output: --0-----------3-----------6-----------7-----------9--------
source_2: --0--------1------------------2--------------3---4---------
throttle interval: --[~~~~~~~~~~~I~~~~~~~~~~~]---[~~~~~~~~~~~]--[~~~~~~~~~~~I~
output_2: --0-----------1---------------2--------------3-----------4-
DebounceTime
debounceTime(500)
source: --0--1--------3------------4-5-6-7-8-9-10-11--13----------------
debounce_interval: -----[~~~~~]--[~~~~~]--------------------------[~~~~~]----------
output: -----------1--------3--------------------------------13---------
What I want
debounceTimeAfterFirst(500) (?)
source: --0--1--------3------------4-5-6-7-8-9-10-11--13----------------
debounce_interval: -----[~~~~~]--[~~~~~]--------------------------[~~~~~]----------
output: --0--------1--3------------4-------------------------13---------
As you see, the debounce time is activated when a new value is entered. If the debounce time passes and any new value has been entered, it stops the listening the debounceTime action and waits to start a new one.
Edit: I forgot to comment that this must be integrated with NgRx’s Effects, so it must be a continuous stream that mustn't be completed. Terminating it would probably cause it to stop listening for dispatched actions.
Upvotes: 7
Views: 6079
Reputation: 1
this is the operator i wrote:
export function throttleDebounce<T>(time: number): OperatorFunction<T, T> {
return (source: Observable<T>): Observable<T> => {
const debounced = source.pipe(debounceTime(time), share());
return merge(source.pipe(throttle(() => debounced)), debounced).pipe(map((value) => value));
};
}
Upvotes: 0
Reputation: 454
I would use a throttle
combined with a debounceTime
:
throttle: from Documentation Emit value on the leading edge of an interval, but suppress new values until durationSelector has completed.
debounceTime: from Documentation Discard emitted values that take less than the specified time between output.
I would use a throttle stream to get the raising edge (the first emission) and then the debounce stream would give us the falling edge.
const source = fromEvent(document.getElementsByTagName('input'), 'keyup').pipe(
pluck('target', 'value')
);
const debounced = source.pipe(
debounceTime(4000),
map((v) => `[d] ${v}`)
);
const effect = merge(
source.pipe(
throttle((val) => debounced),
map((v) => `[t] ${v}`)
),
debounced
);
effect.subscribe(console.log);
See RxJS StackBlitz with the console open to see the values changing.
I prepared the setup to adapt it to NgRx which you mention. The effect I got working is:
@Injectable({ providedIn: 'root' })
export class FooEffects {
switchLight$ = createEffect(() => {
const source = this.actions$.pipe(
ofType('[App] Switch Light'),
pluck('onOrOff'),
share()
);
const debounced = source.pipe(debounceTime(1000), share());
return merge(source.pipe(throttle((val) => debounced)), debounced).pipe(
map((onOrOff) => SetLightStatus({ onOrOff }))
);
});
constructor(private actions$: Actions) {}
}
See NgRx StackBlitz with the proposed solution working in the context of an Angular NgRx application.
I also tried to adapt @martin's connect()
approach. But I don't know how @martin would "reset" the system so that after a long time if a new source value is emitted would not debounce it just in the same manner as you first run it, @martin, feel free to fork it and tweak it to make it work, I'm curious about your approach, which is very smart. I didn't know about connect()
.
@avicarpio give it a go on your application and let us know how it goes :)
Upvotes: 7
Reputation: 8022
Here's a custom operator that (as far s I can tell) does what you're after.
The two key insights here are:
exhaustMap
and another to inspect and debounce emissions with switchMap
function throttleDebounceTime<T>(interval: number): MonoTypeOperatorFunction<T> {
// Use this token's memory address as a nominal token
const resetToken = {};
return connect(s$ => s$.pipe(
exhaustMap(a => s$.pipe(
startWith(resetToken),
switchMap(b => timer(interval).pipe(mapTo(b))),
take(1),
filter<T>(c => c !== resetToken),
startWith(a)
))
));
}
example:
of(1,2,3,4).pipe(
throttleDebounceTime(500)
).subscribe(console.log);
// 1 [...0.5s wait] 4
Upvotes: 0
Reputation: 96891
I think you could do it like the following, even though I can't think of any easier solution right now (I'm assuming you're using RxJS 7+ with connect()
operator):
connect(shared$ => shared$.pipe(
exhaustMap(value => merge(
of(value),
shared$.pipe(debounceTime(1000)),
).pipe(
take(2),
)),
)),
Live demo: https://stackblitz.com/edit/rxjs-qwoesj?devtoolsheight=60&file=index.ts
connect()
will share the source Observable and lets you reuse it in its project function multiple times. I'm using it only to use the source Observable inside another chain.
exhaustMap()
will ignore all next
notifications until its inner Observable completes. In this case the inner Observable will immediately reemit the current value (of(value)
) and then use debounceTime()
. Any subsequent emission from source is ignored by exhaustMap()
because the inner Observable hasn't completed yet but is also passed to debounceTime()
. Then take(2)
is used to complete the chain after debounceTime()
emits and the whole process can repeat when source emits because exhaustMap()
won't ignore the next
notification (its inner Observable has completed).
Upvotes: 0