phacic
phacic

Reputation: 1552

Rxjs distinctUntilChanged with timer

Is there a way to add a timer to distinctUntilChanged?

In rxjs 6

of(1, 2, 3, 3, 5, 6)
  .pipe(distinctUntilChanged)
  .subscribe(x => console.log(x)

My question is, is there a way to add a timer so that repeated values will emit if they fall outside the time range but will be ignored if they fall within the time range.

Like if the say the timer is set to 1000 (1s), in this case interval(1000) and a repeated value is emitted within the 1s it will be ignore but if after the interval (1s) it should be allowed through. I hope am making sense here

Also if there's a better way to do it without distinctUntilChanged i will gladly welcome that.

Thanks in advance

Upvotes: 10

Views: 2732

Answers (4)

jajabarr
jajabarr

Reputation: 561

Just append a date property to the previous map output, and compare it in the distinctUntilChanged, and pluck out the date property after :)

fromEvent(document, 'keydown')
    .pipe(
        map(e => ({key: e.key, time: Date.now()})),
        distinctUntilChanged((a, b) => (a.key == b.key) && (a.time > b.time - 500)),
        pluck('key')
    )

Upvotes: 3

ibenjelloun
ibenjelloun

Reputation: 7733

It was nice to think about this issue, I ended up using a merge of the two following observables :

The simple distincUntilChanged for the new values :

const untilChanged$ = sharedRequest$
 .pipe(distinctUntilChanged());

A gap filler that will resend the last value every n milliseconds if no new value was received :

const gapFiller$ = untilChanged$
 .pipe(switchMap(a => interval(3000)
 .pipe(mapTo('Resend - ' + a))));

This Observable will create a new interval every time the untilChanged$ sends a new value.

Here is the stackblitz demonstration, open the browsers console instead of the stackblitz console to have the time between each fire. I added a 'Resend' string before the values sent by the gapFiller$ Observable.

Upvotes: 1

martin
martin

Reputation: 96979

This is one of the very rare occasions where windowTime is useful. windowTime creates a new Subject after a period of time that is passed further (so called higher-order Observable). Then you can use concatMap to chain it with distinctUntilChanged. This all means you'll create a new distinctUntilChanged every 1s:

const subject = new Subject();

subject
  .pipe(
    windowTime(1000),
    concatMap(obs => obs.pipe(distinctUntilChanged())),
  )
  .subscribe(console.log);

subject.next(1);
subject.next(2);

setTimeout(() => subject.next(2), 1200);

See live demo: https://stackblitz.com/edit/rxjs6-demo-pcmrzz?file=index.ts

Upvotes: 10

Picci
Picci

Reputation: 17762

skipUntil should do what you are looking for, if I understood right.

This can be an example

const obs = interval(100).pipe(
    tap(d => console.log('data emitted', d)),
    take(10),
);


obs
.pipe(
    skipUntil(interval(500).pipe(take(1)))
)
.subscribe(
    data => console.log('data read in the subscription', data)
)

Upvotes: 1

Related Questions