msangel
msangel

Reputation: 10362

Create an Observable by combining other Observables by sampling from first when in second is emiting event

I need to get combining of two observers when one is an infinite data source, and another is an indicator for getting last value from first. I will draw an image here what I want: enter image description here

I have looked on http://reactivex.io/documentation/operators.html#combining but none has fit my requirements.

Upvotes: 3

Views: 604

Answers (4)

Maxim Volgin
Maxim Volgin

Reputation: 4077

public final Observable sample(Observable sampler) http://reactivex.io/RxJava/javadoc/rx/Observable.html#sample(rx.Observable)

Upvotes: 4

Dmitry Zaytsev
Dmitry Zaytsev

Reputation: 23952

This can be achieved quite easily with flatMap:

observable2
    .flatMap(value -> observable1.take(1))

Upvotes: 2

JohnWowUs
JohnWowUs

Reputation: 3083

Try withLatestFrom (it's marked experimental but it has been in the RxJava API for a while now)

Observable<Long> interval = Observable.interval(500, TimeUnit.MILLISECONDS);
Observable<Long> slowerInterval = Observable.interval(2, TimeUnit.SECONDS);
slowerInterval.withLatestFrom(interval, new Func2<Long, Long, Long>() {
     @Override
            public Long call(Long first, Long second) {
                return second;
            }
})  

Upvotes: 2

ehehhh
ehehhh

Reputation: 1074

I created a small example with interval operators.

Observable<Long> interval = Observable.interval(500, TimeUnit.MILLISECONDS);
Observable<Long> slowerInterval = Observable.interval(2, TimeUnit.SECONDS);
Observable.combineLatest(
            interval,
            slowerInterval,
            new Func2<Long, Long, Long>() {

                Long previousSecond = null;

                @Override
                public Long call(Long first, Long second) {
                    if (!second.equals(previousSecond)) {
                        previousSecond = second;
                        return first;
                    } else {
                        return null;
                    }
                }
            })
            .filter(value -> value != null)
            .subscribe(new Subscriber<Long>() {
                @Override
                public void onNext(Long value) {
                    // Value gets emitted here every time slowerInterval emits
                }

                ...
            });

Explanation: Because of the way combineLatest works, I needed to cache the second observables value and check if it has changed before emitting a value from first observable downstream. If it hasn't changed I emit null, which is then filtered out. There probably is a more elegant way to do this, but I couldn't think of one.

Upvotes: 1

Related Questions