Patroy
Patroy

Reputation: 377

Rx - How to apply to stream distinctUntilChanged that allows emit duplicate item after some period?

So, I have to implement some chain or perhaps some custom RxJava operator to an Observable, that will distinct items emitted from Observable, until they change, but only for some short period (like 1 sec), then duplicated item can be emitted again.

What I need is some nested combination of distinctUntilChanged, with throttle?

Main requirements are:

I couldn't find any operator that matches my requirement so, probably I'll need to write some custom Rx operator, but still I can't figure out how to start

Upvotes: 2

Views: 1604

Answers (2)

Patroy
Patroy

Reputation: 377

So, I figured out, turned out to be quite easy to implement:

fun <T> Observable<T>.distinctUntil(time: Long): Observable<T> {
    return this
        .timestamp()
        .distinctUntilChanged { old, new ->
            new.value() == old.value() && new.time() - old.time() < time
        }
        .map { it.value() }
}

Upvotes: 2

martin
martin

Reputation: 96891

You can do this with just groupBy operator. Since each group$ is piped with throttleTime and each emission goes through this group$ Observable it will ignore all subsequent emission for 1s:

source$
  .pipe(
    groupBy(item => item.whatever),
    mergeMap(group$ => group$.pipe(
      throttleTime(1000)
    )),
  )
  .subscribe(...);

Upvotes: 3

Related Questions