Reputation: 1306
I'm trying to use the RxJS debounce operator, but I want to customize when the emissions from the source are debounced.
By default, any emission from the source inside the debounce window will cause the previous emission to be dropped. I want only certain emissions from the source to count towards the debounce operation, based on the value of the source emission.
Let's say I have an observable of objects that look like this:
{
priority: 'low' //can be 'low' or 'medium' or 'high
}
I want the debounce to group by the object's priority. That means an emission will be debounced by another emission only if it has the same priority.
i.e. only 'low'
emissions can debounce 'low'
emissions, and only 'high'
emissions can debounce 'high'
emissions. If a 'medium'
emission comes while a 'low'
emission is waiting, it won't cause the 'low'
emission to be dropped.
This means if I had a 'low'
emission and a 'medium'
emission in quick succession, both would go through. If I had two 'low'
emissions in quick succession, only the last one would go through.
Here is what I came up with:
const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
const delay = 1000
$source.pipe(
mergeMap(value => {
// We start a race of the value with a delay versus any other emissions from the source with the same priority
return race(
timer(delay).pipe(mapTo(value)),
$source.pipe(
filter(v => v.priority === value.priority),
)
).pipe(
take(1),
// If another emission with the same priority comes before the delay, the second racer it will win the race.
// If no emission with the same priority comes, the first racer will win.
//
// If the first racer wins, this equality check is satisfied and the value is passed through.
// If the second racer wins, the equality check fails and no value is emitted. Since this is a mergeMap, this whole process will start again for that emission.
filter(v => v === value),
)
})
)
I think the above is correct, but I'm wondering if maybe I'm missing something or making this way more complicated than it needs to be? The code above should function as if it were merging three separate streams of $low.pipe(debounceTime(delay))
$medium.pipe(debounceTime(delay))
and $high.pipe(debounceTime(delay))
.
Thanks!!
Upvotes: 0
Views: 306
Reputation: 8022
I think your answer works. It's also pretty clear. You do, however, have to be sure your $source
is multicasted.
There's one downside I see to your approach:
You do a lot of extra computation. If you're debouncing 1000s of values per second, it might noticeably slow down depending on where it's being run.
Each streamed value can be in any number of races. Inputs from different priorities still race each other and when the next value starts its race, the previous race isn't stopped, so you can have an explosion of timers/races if a lot of values arrive at once.
It's a lot of extra timers be set and dropped. In your situation, you should need a max of three timers, each of which gets reset as a new value of the same priority arrives.
If your code isn't on the critical path that might not be a problem. Otherwise, there are other ways. The one I thought up, though, is a bit bulkier in terms of code.
Here's how my brain solved this problem. I created an operator that does what RxJS partition
operator does but lets you partition into more than two streams.
My approach handles multicasting internally, so the source can be whatever (hot, cold, multicasted, or not). It (internally) sets up one subject per stream and then you can use RxJS's debounceTime as usual.
There's a downside though. In your approach, you can add a new priority string willy-nilly and it should continue to work. Objects of {priority: "DucksSayQuack"} will debounce each other and not effect other priorities. This can even be done on the fly.
The partitionOn
operator below needs to know the partitions ahead of time. For your described case it should have the same output and be a bit more efficient to boot.
Is this better? I dunno, it's a fun and different approach to solve the same problem. Also, I suppose there are more uses for the partitionOn
operator than a partitioned debounce.
/***
* Create a partitioned stream for each value where a passed
* predicate returns true
***/
function partitionOn<T>(
input$: Observable<T>,
predicates: ((v:T) => boolean)[]
): Observable<T>[] {
const partitions = predicates.map(predicate => ({
predicate,
stream: new Subject<T>()
}));
input$.subscribe({
next: (v:T) => partitions.forEach(prt => {
if(prt.predicate(v)){
prt.stream.next(v);
}
}),
complete: () => partitions.forEach(prt => prt.stream.complete()),
error: err => partitions.forEach(prt => prt.stream.error(err))
});
return partitions.map(prt => prt.stream.asObservable());
}
const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
const delay = 1000;
const priorityEquals = a => b => a === b?.priority;
merge(
...partitionOn(
$source,
[priorityEquals('low'),
priorityEquals('medium'),
priorityEquals('high')]
).map(s => s.pipe(
debounceTime(1000)
))
);
This approach is very similar to yours and lets you use your priority strings willy-nilly again. This has a similar issue where every value is thrown into a timer and timers aren't canceled as new values arrive.
With this approach, however, the path to canceling unnecessary timers is much more clear. You can store subscription objects alongside timestamps in the priorityTimeStamp
map, and be sure to unsubscribe as new values arrive.
I really have no clue what the performance hit for this might be, I think JavaScript's event loop is pretty robust/efficient. The nice thing with this approach is that you don't pay the cost of multicasting. This is all just effectively one stream using a lookup-map to decide what gets filtered and what doesn't.
function priorityDebounceTime<T>(
dbTime: number,
priorityStr = "priority"
): MonoTypeOperatorFunction<T> {
return s => defer(() => {
const priorityTimeStamp = new Map<string, number>();
return s.pipe(
mergeMap(v => {
priorityTimeStamp.set(v[priorityStr], Date.now());
return timer(dbTime).pipe(
timestamp(),
filter(({timestamp}) =>
timestamp - priorityTimeStamp.get(v[priorityStr]) >= dbTime
),
mapTo(v)
)
})
)
});
}
This is obviously a bit simpler:
const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
const delay = 5000;
$source.pipe(
priorityDebounceTime(delay)
).subscribe(console.log);
Upvotes: 1