Estevanbs
Estevanbs

Reputation: 150

Concat ConnectableObservables

I have a List of ConnectableObservable, and I want to run one item from the list when the previous item is done. I've tried applying concat() method on the list, but apparently this method doesn't work on ConnectableObservables. How can I do this?

This is what I've tried:

ConnectableObservable<Long> observable1 =
    Observable.timer(1500, TimeUnit.MILLISECONDS).publish();

ConnectableObservable<Long> observable2 =
    Observable.timer(1550, TimeUnit.MILLISECONDS).publish();


List<ConnectableObservable<Long>> list = new ArrayList<>();
list.add(observable1);
list.add(observable2);

Observable.concat(list).doOnNext(aLong -> {
    Log.i("result", aLong.toString());
}).subscribe();

observable1.connect();
observable2.connect();

Here, observable2 runs 50 milliseconds after observable1 completes, not 1550 as expected.

Upvotes: 0

Views: 107

Answers (1)

mtw
mtw

Reputation: 144

You didn't invoke connect() method on ConnectableObservable, so it hadn't started to publish anything.

Put

observable1.connect();
observable2.connect();

at the end of your code. Alternatively you can add .autoConnect(1) to your observable1 and observable2.

It doesn't work simultaneously as following junit confirms it:

    @Test
    void connectableTest() {
        TestScheduler testScheduler = new TestScheduler();
        ConnectableObservable<Integer> observable1 =
                Observable
                        .just(1, 2, 3)
                        .zipWith(
                                Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler), (integer, time) -> integer)
                        .publish();
        ConnectableObservable<Integer> observable2 =
                Observable
                        .just(4, 5, 6)
                        .zipWith(
                                Observable.interval(3, 1, TimeUnit.SECONDS, testScheduler), (integer, time) -> integer)
                        .publish();

        List<ConnectableObservable<Integer>> list = new ArrayList<>();
        list.add(observable1);
        list.add(observable2);

        TestObserver<Integer> output = Observable.concat(list).test();

        observable1.connect();
        observable2.connect();

        testScheduler.advanceTimeBy(20, TimeUnit.SECONDS);
        output.assertValues(1,2,3,4,5,6).assertComplete();
    }

Upvotes: 0

Related Questions