Reputation: 311
"Here you have", someone says and you are given this input stream of values that you somewhat want to do distinctUntilChanged() upon...
Input: '1-1----11---2--1122----1---2---2-2-1-2---|'
Output: '1-----------2--1-2-----1---2-------1-2---|'
Nothing weird so far,
But now someone says "it's okey" if the same value comes again, "but only if it's not to soon!". I want at least '----'
ticks between the same value. "Okey" you say and you add a throttle
const source = new Subject<number>();
// mysterious cave troll is randomly source.next(oneOrTwo)
const example = source.pipe(throttle(val => interval(4000)));
Input: '1-1----11---2--1122----1---2---2-2-1-2-----|'
Output: '1------1----2----2-----1-------2-----2-----|'
"That's not what I want! Look at all the value you missed", referring to that you throttle in regards to all values being streamed.
Input: '1-1----11---2--1122----1---2---2-2-1-2-----|'
Output: '1------1----2----2-----1-------2-----2-----|'
'-------------->1<--------->2<----->1<------|' <-- Missed values
"Here, let me show show you" the mysterious man says and gives you this
Input: '1-1----11---2--1112----1---2---2-2-1-2-----|'
Output: '1------1----2--1--2----1---2-----2-1-------|'
My answer to this is that it feels like a combined window wouldn't do.
From someone more experienced,
is this a hard problem to solve? (or have I missed an obvious solution)
Upvotes: 8
Views: 2383
Reputation: 311
I found a solution that works, does someone have any take on this?
source.pipe(
windowTime(4000),
concatMap(obs => obs.pipe(distinct()))
);
Examples from before, in a StackBlitz example
UPDATE: this does not actually work 100%. It only take the current window into consideration. So you can for example have
`[1-12][2---]` which would give `1--22---|`
where [----]
would represent the time window. In other words, if a value is first emitted last in one window and emitted first in the next window, the same value will pass through right after each other.
Thanks @eric99 for making me realize this.
Upvotes: 0
Reputation:
This is my second attempt, it filters the stream by output (rather than taking distinctUntil) then throttles and merges the two streams.
Of course, we may not have a known set of values (1,2,...n).
If I can figure out that wrinkle, will add a further example.
const output = merge(
source.pipe( filter(x => x === 1), throttle(val => interval(ms))),
source.pipe( filter(x => x === 2), throttle(val => interval(ms)))
)
Here is my check (ms = 4000)
input 1-1----11---2--1112----1---2---2-2-1-2-----
expected 1------1----2--1--2----1---2-----2-1-------
filter(1) 1-1----11------111-----1-----------1-------
throttle(1) 1------1-------1-------1-----------1-------
filter(2) ------------2-----2--------2---2-2---2-----
throttle(2) ------------2-----2--------2-----2---------
merged 1------1----2--1--2----1---2-----2-1-------
expected 1------1----2--1--2----1---2-----2-1-------
I think this will work where the set of values in the stream is not known in advance (or has a large range so extending the previous answer is impractical).
It should work as long as the source completes.
merge(
source.pipe(
distinct().pipe(
mapTo(distinctVal => source.pipe(
filter(val = val === distinctVal),
throttle(val => interval(ms))
)
)
)
)
I don't have a proof yet, will post that next.
Upvotes: 1
Reputation: 11345
Here is a tricky solution base on theory of operators, but I can't sure it really works, because I will need to mock a source emission first.
So throttle and distinct stream always have the latest value cached, zip make sure they always got emitted in pair, zip will always emit when any of the stream emit because it's shareReplay(1).
We always take the value emit from distinctStream, even when zip stream is trigger by throttle, because distinctStream always have the last cached value.
const throttleStream= source.pipe(throttle(val => interval(4000)),shareReplay(1))
const distinctStream= source.pipe(distinctUntilChanged(),shareReplay(1))
zip(throttleStream,distinctStream).pipe(
map((t,d)=>d)
)
Upvotes: 0
Reputation: 2947
First I came up with idea to somehow combine distinctUntilChanged()
and throttleTimte()
, however it was not possible for me to come up with solution and then I tried something else.
The operator I came up with is throttleDistinct()
that works as you would like to: StackBlit Editor Link
It has 2 parameters which are:
duration: number
which is in milliseconds and is similar to
duration in throttleTime(duration: number)
equals: (a: T, b: T) => boolean
which is function to compare if previous item is equal to next item, which has default
implementation of (a, b) => a === b
import { of, fromEvent, interval, Observable } from 'rxjs';
import { map, scan, filter, } from 'rxjs/operators';
const source = fromEvent(document, 'keypress')
.pipe(map((x: any) => x.keyCode as number))
source
.pipe(
throttleDistinct(1000),
)
.subscribe((x) => console.log('__subscribe__', x));
export function throttleDistinct<T>(
duration: number,
equals: (a: T, b: T) => boolean = (a, b) => a === b
) {
return (source: Observable<T>) => {
return source
.pipe(
map((x) => {
const obj = { val: x, time: Date.now(), keep: true };
return obj;
}),
scan((acc, cur) => {
const diff = cur.time - acc.time;
const isSame = equals(acc.val, cur.val)
return diff > duration || (diff < duration && !isSame)
? { ...cur, keep: true }
: { ...acc, keep: false };
}),
filter((x) => x.keep),
map((x) => x.val),
)
}
}
Upvotes: 7
Reputation:
Off the top of my head, you want to buffer by the time interval, then distinct within each buffer.
Effectively you want to restart / reboot the distinct run every n milliseconds.
source.pipe(
bufferTime(ms),
mergeMap(bufferArray => from(bufferArray).pipe(distinctUntilChanged()) )
)
Upvotes: 1