Reputation: 1552
Is there a way to add a timer to distinctUntilChanged?
In rxjs 6
of(1, 2, 3, 3, 5, 6)
.pipe(distinctUntilChanged)
.subscribe(x => console.log(x)
My question is, is there a way to add a timer so that repeated values will emit if they fall outside the time range but will be ignored if they fall within the time range.
Like if the say the timer is set to 1000 (1s), in this case interval(1000)
and a repeated value is emitted within the 1s it will be ignore but if after the interval (1s) it should be allowed through. I hope am making sense here
Also if there's a better way to do it without distinctUntilChanged
i will gladly welcome that.
Thanks in advance
Upvotes: 10
Views: 2732
Reputation: 561
Just append a date property to the previous map output, and compare it in the distinctUntilChanged, and pluck out the date property after :)
fromEvent(document, 'keydown')
.pipe(
map(e => ({key: e.key, time: Date.now()})),
distinctUntilChanged((a, b) => (a.key == b.key) && (a.time > b.time - 500)),
pluck('key')
)
Upvotes: 3
Reputation: 7733
It was nice to think about this issue, I ended up using a merge of the two following observables :
The simple distincUntilChanged for the new values :
const untilChanged$ = sharedRequest$
.pipe(distinctUntilChanged());
A gap filler that will resend the last value every n milliseconds if no new value was received :
const gapFiller$ = untilChanged$
.pipe(switchMap(a => interval(3000)
.pipe(mapTo('Resend - ' + a))));
This Observable will create a new interval
every time the untilChanged$
sends a new value.
Here is the stackblitz demonstration, open the browsers console instead of the stackblitz console to have the time between each fire. I added a 'Resend' string before the values sent by the gapFiller$
Observable.
Upvotes: 1
Reputation: 96979
This is one of the very rare occasions where windowTime
is useful. windowTime
creates a new Subject after a period of time that is passed further (so called higher-order Observable). Then you can use concatMap
to chain it with distinctUntilChanged
. This all means you'll create a new distinctUntilChanged
every 1s:
const subject = new Subject();
subject
.pipe(
windowTime(1000),
concatMap(obs => obs.pipe(distinctUntilChanged())),
)
.subscribe(console.log);
subject.next(1);
subject.next(2);
setTimeout(() => subject.next(2), 1200);
See live demo: https://stackblitz.com/edit/rxjs6-demo-pcmrzz?file=index.ts
Upvotes: 10
Reputation: 17762
skipUntil
should do what you are looking for, if I understood right.
This can be an example
const obs = interval(100).pipe(
tap(d => console.log('data emitted', d)),
take(10),
);
obs
.pipe(
skipUntil(interval(500).pipe(take(1)))
)
.subscribe(
data => console.log('data read in the subscription', data)
)
Upvotes: 1