Reputation: 53
I have spent few days but could not find a way to do "distinct throttle" in RxJS.
Assume each event completes in 4 dashes, a "distinct throttle" will perform as follows:
-①-②-①---------①-----|->
[distinct throttle]
-①-②-------------①-----|->
How can I use existing RxJS operators to build a "distinct throttle"?
Upvotes: 5
Views: 930
Reputation: 10808
distinct
and throttle
have 2 different characteristics regarding item pick. distinct
will pick the first item while throttle
will pick the last.
Sometimes you want to keep throttle
's behavior.
Let's say the stream is: chat-message-edit events carrying the updated text. A user may edit a specific message multiple times within the throttle period.
You want to be sure that you always keep the last version of each message (among a stream of edits of differrent messages ).
A possible solution I would follow for this is the one below
const source$ = from([
{id:1,content:"1a"},
{id:1,content:"1b"},
{id:1,content:"1c"},
{id:2,content:"2a"},
{id:2,content:"2b"},
{id:3,content:"3a"},
{id:3,content:"3b"},
{id:1,content:"1d"},
{id:1,content:"1e"},
{id:4,content:"4a"},
{id:4,content:"4b"},
{id:4,content:"4c"},
{id:4,content:"4e"},
{id:4,content:"4f"},
{id:3,content:"3c"},
{id:3,content:"3d"},
{id:3,content:"3e"}
]).pipe(concatMap((el)=> of(el).pipe(delay(500)) ));
const distinctThrottle = (throttleTime, keySelector)=>
pipe(bufferTime(throttleTime),
concatMap((arr)=>from(arr.reverse()).pipe(distinct(keySelector))
)) ;
let throttledStream = source$.pipe(distinctThrottle(1550, ({id})=>id));
throttledStream.subscribe(console.log);
Upvotes: 0
Reputation: 58400
You can use groupBy
to separate the notifications by value and can then apply throttleTime
and can then merge the grouped observables using mergeMap
. Like this:
const { Subject } = rxjs;
const { groupBy, mergeMap, throttleTime } = rxjs.operators;
const source = new Subject();
const result = source.pipe(
groupBy(value => value),
mergeMap(grouped => grouped.pipe(
throttleTime(400)
))
);
result.subscribe(value => console.log(value));
setTimeout(() => source.next(1), 100);
setTimeout(() => source.next(2), 300);
setTimeout(() => source.next(1), 400);
setTimeout(() => source.next(1), 900);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>
Upvotes: 8