Reputation: 150
I have a List
of ConnectableObservable
, and I want to run one item from the list when the previous item is done. I've tried applying concat()
method on the list, but apparently this method doesn't work on ConnectableObservables. How can I do this?
This is what I've tried:
ConnectableObservable<Long> observable1 =
Observable.timer(1500, TimeUnit.MILLISECONDS).publish();
ConnectableObservable<Long> observable2 =
Observable.timer(1550, TimeUnit.MILLISECONDS).publish();
List<ConnectableObservable<Long>> list = new ArrayList<>();
list.add(observable1);
list.add(observable2);
Observable.concat(list).doOnNext(aLong -> {
Log.i("result", aLong.toString());
}).subscribe();
observable1.connect();
observable2.connect();
Here, observable2
runs 50 milliseconds after observable1
completes, not 1550 as expected.
Upvotes: 0
Views: 107
Reputation: 144
You didn't invoke connect()
method on ConnectableObservable
, so it hadn't started to publish anything.
Put
observable1.connect();
observable2.connect();
at the end of your code.
Alternatively you can add .autoConnect(1)
to your observable1
and observable2
.
It doesn't work simultaneously as following junit confirms it:
@Test
void connectableTest() {
TestScheduler testScheduler = new TestScheduler();
ConnectableObservable<Integer> observable1 =
Observable
.just(1, 2, 3)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler), (integer, time) -> integer)
.publish();
ConnectableObservable<Integer> observable2 =
Observable
.just(4, 5, 6)
.zipWith(
Observable.interval(3, 1, TimeUnit.SECONDS, testScheduler), (integer, time) -> integer)
.publish();
List<ConnectableObservable<Integer>> list = new ArrayList<>();
list.add(observable1);
list.add(observable2);
TestObserver<Integer> output = Observable.concat(list).test();
observable1.connect();
observable2.connect();
testScheduler.advanceTimeBy(20, TimeUnit.SECONDS);
output.assertValues(1,2,3,4,5,6).assertComplete();
}
Upvotes: 0