Reputation: 315
I have some set of observable which I am executing in parallel, like localObservable
and networkObservable
. If the networkObservable
starts emit items (from this time, I need only these items), then discard items emitted by localObservable
(maybe localObservable
has not yet started).
Observable<Integer> localObservable =
Observable.defer(() -> Observable.range(1, 10)).subscribeOn(Schedulers.io());
Observable<Integer> networkObservable =
Observable.defer(() -> Observable.range(11, 20)).subscribeOn(Schedulers.io());
Upvotes: 2
Views: 1566
Reputation: 4012
There is an easy solutions to this by an operator: AMB
Just look at the output of the System.out.
Documentation: http://reactivex.io/documentation/operators/amb.html
Basically you subscribe to both observable at the same time and whatever observable first emits gets through. The other observable will be unsubscribed.
@Test
public void ambTest() throws Exception {
TestScheduler testScheduler = new TestScheduler();
Observable<Integer> network = Observable.timer(1000, TimeUnit.MILLISECONDS, testScheduler)
.concatMap(aLong -> Observable.just(1, 2, 3))
.doOnSubscribe(disposable -> System.out.println("connect network"))
.doOnDispose(() -> System.out.println("dispose network"));
Observable<Integer> local = Observable.timer(500, TimeUnit.MILLISECONDS, testScheduler)
.concatMap(aLong -> Observable.just(4, 5, 6))
.doOnSubscribe(disposable -> System.out.println("connect local"))
.doOnDispose(() -> System.out.println("dispose local"));
Observable<Integer> integerObservable = Observable.ambArray(network, local);
TestObserver<Integer> test = integerObservable.test();
testScheduler.advanceTimeBy(600, TimeUnit.MILLISECONDS);
test.assertValues(4, 5, 6);
testScheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
test.assertValues(4, 5, 6);
}
Upvotes: 1
Reputation: 10267
you can do something like this:
Observable<Long> networkObservable =
Observable.interval(1000, 500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.share();
Observable<Long> localObservable =
Observable.interval(500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.takeUntil(networkObservable);
Observable.merge(networkObservable, localObservable)
.subscribe(System.out::println);
this will output:
0 // localObservable
1 // localObservable
0 // networkObservable from here on
1
2
...
takeUntil
will make localObservable
to stop and unsubscribe when the first emission from networkObservable
happened, so the merged Observable
will emit from localObservable
as long networkObservable
didn't started, and when it does, it will stop emitting from localObservable
and switch to emit only from networkObservable
.
Upvotes: 8