Reputation: 377
So, I have to implement some chain or perhaps some custom RxJava operator to an Observable, that will distinct items emitted from Observable, until they change, but only for some short period (like 1 sec), then duplicated item can be emitted again.
What I need is some nested combination of distinctUntilChanged, with throttle?
Main requirements are:
I couldn't find any operator that matches my requirement so, probably I'll need to write some custom Rx operator, but still I can't figure out how to start
Upvotes: 2
Views: 1604
Reputation: 377
So, I figured out, turned out to be quite easy to implement:
fun <T> Observable<T>.distinctUntil(time: Long): Observable<T> {
return this
.timestamp()
.distinctUntilChanged { old, new ->
new.value() == old.value() && new.time() - old.time() < time
}
.map { it.value() }
}
Upvotes: 2
Reputation: 96891
You can do this with just groupBy
operator. Since each group$
is piped with throttleTime
and each emission goes through this group$
Observable it will ignore all subsequent emission for 1s:
source$
.pipe(
groupBy(item => item.whatever),
mergeMap(group$ => group$.pipe(
throttleTime(1000)
)),
)
.subscribe(...);
Upvotes: 3