erhise
erhise

Reputation: 311

RxJS throttle same value but let new values through

"Here you have", someone says and you are given this input stream of values that you somewhat want to do distinctUntilChanged() upon...

Input:  '1-1----11---2--1122----1---2---2-2-1-2---|'
Output: '1-----------2--1-2-----1---2-------1-2---|'

Nothing weird so far,
But now someone says "it's okey" if the same value comes again, "but only if it's not to soon!". I want at least '----' ticks between the same value. "Okey" you say and you add a throttle

const source = new Subject<number>();

// mysterious cave troll is randomly source.next(oneOrTwo)

const example = source.pipe(throttle(val => interval(4000)));

Input:  '1-1----11---2--1122----1---2---2-2-1-2-----|'
Output: '1------1----2----2-----1-------2-----2-----|'

"That's not what I want! Look at all the value you missed", referring to that you throttle in regards to all values being streamed.

Input:  '1-1----11---2--1122----1---2---2-2-1-2-----|'
Output: '1------1----2----2-----1-------2-----2-----|'
        '-------------->1<--------->2<----->1<------|' <-- Missed values

"Here, let me show show you" the mysterious man says and gives you this

Wanted output

Input:  '1-1----11---2--1112----1---2---2-2-1-2-----|'
Output: '1------1----2--1--2----1---2-----2-1-------|'

My answer to this is that it feels like a combined window wouldn't do.

From someone more experienced,
is this a hard problem to solve? (or have I missed an obvious solution)

Upvotes: 8

Views: 2383

Answers (5)

erhise
erhise

Reputation: 311

I found a solution that works, does someone have any take on this?

source.pipe(
   windowTime(4000),
   concatMap(obs => obs.pipe(distinct()))
);

Examples from before, in a StackBlitz example

UPDATE: this does not actually work 100%. It only take the current window into consideration. So you can for example have

`[1-12][2---]` which would give `1--22---|`

where [----] would represent the time window. In other words, if a value is first emitted last in one window and emitted first in the next window, the same value will pass through right after each other.

Thanks @eric99 for making me realize this.

Upvotes: 0

user8745435
user8745435

Reputation:

This is my second attempt, it filters the stream by output (rather than taking distinctUntil) then throttles and merges the two streams.

Of course, we may not have a known set of values (1,2,...n).
If I can figure out that wrinkle, will add a further example.

const output = merge(
  source.pipe( filter(x => x === 1), throttle(val => interval(ms))),
  source.pipe( filter(x => x === 2), throttle(val => interval(ms)))
)

Here is my check (ms = 4000)

input         1-1----11---2--1112----1---2---2-2-1-2-----
expected      1------1----2--1--2----1---2-----2-1-------

filter(1)     1-1----11------111-----1-----------1-------
throttle(1)   1------1-------1-------1-----------1-------

filter(2)     ------------2-----2--------2---2-2---2-----
throttle(2)   ------------2-----2--------2-----2---------

merged        1------1----2--1--2----1---2-----2-1-------
expected      1------1----2--1--2----1---2-----2-1-------

Extending to n values

I think this will work where the set of values in the stream is not known in advance (or has a large range so extending the previous answer is impractical).

It should work as long as the source completes.

merge(
  source.pipe(
    distinct().pipe(
      mapTo(distinctVal => source.pipe( 
        filter(val = val === distinctVal), 
        throttle(val => interval(ms))
      )
    )  
  )
)

I don't have a proof yet, will post that next.

Upvotes: 1

Fan Cheung
Fan Cheung

Reputation: 11345

Here is a tricky solution base on theory of operators, but I can't sure it really works, because I will need to mock a source emission first.

So throttle and distinct stream always have the latest value cached, zip make sure they always got emitted in pair, zip will always emit when any of the stream emit because it's shareReplay(1).

We always take the value emit from distinctStream, even when zip stream is trigger by throttle, because distinctStream always have the last cached value.

const throttleStream= source.pipe(throttle(val => interval(4000)),shareReplay(1))
const distinctStream= source.pipe(distinctUntilChanged(),shareReplay(1))
zip(throttleStream,distinctStream).pipe(
   map((t,d)=>d)
)

Upvotes: 0

Goga Koreli
Goga Koreli

Reputation: 2947

First I came up with idea to somehow combine distinctUntilChanged() and throttleTimte(), however it was not possible for me to come up with solution and then I tried something else.

The operator I came up with is throttleDistinct() that works as you would like to: StackBlit Editor Link

It has 2 parameters which are:

  1. duration: number which is in milliseconds and is similar to duration in throttleTime(duration: number)
  2. equals: (a: T, b: T) => boolean which is function to compare if previous item is equal to next item, which has default implementation of (a, b) => a === b

import { of, fromEvent, interval, Observable } from 'rxjs';
import { map, scan, filter, } from 'rxjs/operators';

const source = fromEvent(document, 'keypress')
  .pipe(map((x: any) => x.keyCode as number))

source
  .pipe(
    throttleDistinct(1000),
  )
  .subscribe((x) => console.log('__subscribe__', x));

export function throttleDistinct<T>(
  duration: number,
  equals: (a: T, b: T) => boolean = (a, b) => a === b
) {
  return (source: Observable<T>) => {
    return source
      .pipe(
        map((x) => {
          const obj = { val: x, time: Date.now(), keep: true };
          return obj;
        }),
        scan((acc, cur) => {
          const diff = cur.time - acc.time;

          const isSame = equals(acc.val, cur.val)
          return diff > duration || (diff < duration && !isSame)
            ? { ...cur, keep: true }
            : { ...acc, keep: false };
        }),
        filter((x) => x.keep),
        map((x) => x.val),
      )
  }
}

Upvotes: 7

user8745435
user8745435

Reputation:

Off the top of my head, you want to buffer by the time interval, then distinct within each buffer.

Effectively you want to restart / reboot the distinct run every n milliseconds.

source.pipe(
  bufferTime(ms),
  mergeMap(bufferArray => from(bufferArray).pipe(distinctUntilChanged()) )
)

Upvotes: 1

Related Questions