Sander
Sander

Reputation: 1193

RxJs: distinctUntilChanged still emits duplicate values

I have a stream of values that I limit using a lower and upper bound, and to decrease the amount of logging I would like to only emit values if they change. The problem is that the second distinctUntilChanged() in the following snippet still produces duplicates:

Observable // emits i.e. [2, 2, 2, 5, 5, 10, 20]
.distinctUntilChanged() // becomes [2, 5, 10, 20]
.map(target => {
  const correctedTarget = Math.min(Math.max(target, MINIMUM), MAXIMUM); // Let's say min: 5, max: 10
  if(correctedTarget != target) {
    logger.warn(`Prediction wants to scale out-of-bounds to ${target}, limiting to ${correctedTarget}`);
  }
  return correctedTarget;
}) // becomes [5, 5, 10, 10]
.distinctUntilChanged() // should be [5, 10], but is [5, 5, 10, 10]

The RxJs docs state that the filter defaults to a simple equality comparison, so I'd expect that this should Just Work™.

Upvotes: 4

Views: 6225

Answers (2)

Simon_Weaver
Simon_Weaver

Reputation: 145930

Just adding this here because this question was a top match for 'duplicate values'.

This following was emitting duplicate values for me.

BAD - Emits the time continuously! Crashes browser!

The intent here is to output the getCurrentTime() value of a YouTube video.

this.currentTime = this.timer.pipe(
    filter(i => this.isReady),
    map(i => this.ytPlayer.getCurrentTime(), distinctUntilChanged())
);

GOOD - Returns distinct values

this.currentTime = this.timer.pipe(
    filter(i => this.isReady),
    map(i => this.ytPlayer.getCurrentTime()),
    distinctUntilChanged()
);

I'll leave it to the reader to spot the difference. Note that these both did compile with typescript.

Upvotes: 0

Sander
Sander

Reputation: 1193

One of my colleagues (once again) identified the problem (also thanks for the help Matt). It turns out that the initial assumption was wrong - the outline of the code was as follows:

Observable // emits credentials on some interval
.flatmap(credentials => {
  return Observable2.doSomething()
         .distinctUntilChanged()
         ...
})

As you can see, the distinctUntilChanged was chained to Observable2, which is a new observable stream every time the credentials get emitted. This also explains why the comparator function I made doesn't get called at all: there is just one value every time so there is nothing to compare to.

Upvotes: 1

Related Questions