Reputation: 10362
I need to get combining of two observers when one is an infinite data source, and another is an indicator for getting last value from first.
I will draw an image here what I want:
I have looked on http://reactivex.io/documentation/operators.html#combining but none has fit my requirements.
Upvotes: 3
Views: 604
Reputation: 4077
public final Observable sample(Observable sampler)
http://reactivex.io/RxJava/javadoc/rx/Observable.html#sample(rx.Observable)
Upvotes: 4
Reputation: 23952
This can be achieved quite easily with flatMap
:
observable2
.flatMap(value -> observable1.take(1))
Upvotes: 2
Reputation: 3083
Try withLatestFrom
(it's marked experimental but it has been in the RxJava API for a while now)
Observable<Long> interval = Observable.interval(500, TimeUnit.MILLISECONDS);
Observable<Long> slowerInterval = Observable.interval(2, TimeUnit.SECONDS);
slowerInterval.withLatestFrom(interval, new Func2<Long, Long, Long>() {
@Override
public Long call(Long first, Long second) {
return second;
}
})
Upvotes: 2
Reputation: 1074
I created a small example with interval
operators.
Observable<Long> interval = Observable.interval(500, TimeUnit.MILLISECONDS);
Observable<Long> slowerInterval = Observable.interval(2, TimeUnit.SECONDS);
Observable.combineLatest(
interval,
slowerInterval,
new Func2<Long, Long, Long>() {
Long previousSecond = null;
@Override
public Long call(Long first, Long second) {
if (!second.equals(previousSecond)) {
previousSecond = second;
return first;
} else {
return null;
}
}
})
.filter(value -> value != null)
.subscribe(new Subscriber<Long>() {
@Override
public void onNext(Long value) {
// Value gets emitted here every time slowerInterval emits
}
...
});
Explanation: Because of the way combineLatest
works, I needed to cache the second observables value and check if it has changed before emitting a value from first observable downstream. If it hasn't changed I emit null
, which is then filtered out. There probably is a more elegant way to do this, but I couldn't think of one.
Upvotes: 1