xymelon
xymelon

Reputation: 315

rxjava switch observable if second observable start emits items

I have some set of observable which I am executing in parallel, like localObservable and networkObservable. If the networkObservable starts emit items (from this time, I need only these items), then discard items emitted by localObservable (maybe localObservable has not yet started).

Observable<Integer> localObservable =
            Observable.defer(() -> Observable.range(1, 10)).subscribeOn(Schedulers.io());
Observable<Integer> networkObservable =
            Observable.defer(() -> Observable.range(11, 20)).subscribeOn(Schedulers.io());

Upvotes: 2

Views: 1566

Answers (2)

Sergej Isbrecht
Sergej Isbrecht

Reputation: 4012

There is an easy solutions to this by an operator: AMB

Just look at the output of the System.out.

Documentation: http://reactivex.io/documentation/operators/amb.html

Basically you subscribe to both observable at the same time and whatever observable first emits gets through. The other observable will be unsubscribed.

@Test
public void ambTest() throws Exception {
    TestScheduler testScheduler = new TestScheduler();

    Observable<Integer> network = Observable.timer(1000, TimeUnit.MILLISECONDS, testScheduler)
                .concatMap(aLong -> Observable.just(1, 2, 3))
                .doOnSubscribe(disposable -> System.out.println("connect network"))
                .doOnDispose(() -> System.out.println("dispose network"));

    Observable<Integer> local = Observable.timer(500, TimeUnit.MILLISECONDS, testScheduler)
                .concatMap(aLong -> Observable.just(4, 5, 6))
                .doOnSubscribe(disposable -> System.out.println("connect local"))
                .doOnDispose(() -> System.out.println("dispose local"));

    Observable<Integer> integerObservable = Observable.ambArray(network, local);

    TestObserver<Integer> test = integerObservable.test();

    testScheduler.advanceTimeBy(600, TimeUnit.MILLISECONDS);

    test.assertValues(4, 5, 6);

    testScheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);

    test.assertValues(4, 5, 6);
}

Upvotes: 1

yosriz
yosriz

Reputation: 10267

you can do something like this:

 Observable<Long> networkObservable =
            Observable.interval(1000, 500, TimeUnit.MILLISECONDS)
                    .subscribeOn(Schedulers.io())
                    .share();
    Observable<Long> localObservable =
            Observable.interval(500, TimeUnit.MILLISECONDS)                       
                    .subscribeOn(Schedulers.io())
                    .takeUntil(networkObservable);

    Observable.merge(networkObservable, localObservable)
            .subscribe(System.out::println);

this will output:

0 // localObservable 
1 // localObservable 
0 // networkObservable from here on
1
2
...

takeUntil will make localObservable to stop and unsubscribe when the first emission from networkObservable happened, so the merged Observable will emit from localObservable as long networkObservable didn't started, and when it does, it will stop emitting from localObservable and switch to emit only from networkObservable.

Upvotes: 8

Related Questions