Muhammad Hewedy
Muhammad Hewedy

Reputation: 30078

unable to test merge operator of rxjava

I wanted to test merge operator using rxjava 1 using:

    Observable<Object> o1 = Observable.create((t) -> {

        t.onNext("1");
        try {
            Thread.sleep(5000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        t.onNext("2");
        t.onCompleted();
    });

    Observable<Object> o2 = Observable.create((t) -> {
        t.onNext("a");
        t.onNext("b");
        t.onCompleted();
    });

    o1.mergeWith(o2).subscribe(System.out::println);

I expect the output to be: 1ab2 but the actual is 12ab.

So, why?

Upvotes: 0

Views: 109

Answers (1)

Kiskae
Kiskae

Reputation: 25603

Because merge subscribes to the Observables in a synchronous fashion. And since the first Observable does not explicitly move work to a background thread the body of create(...) is also executed synchronously.

This means that when subscribing to o1 it has to fully complete the body in the create(...) call before it subscribes to the next observable.

Explicitly making the observables run in the background using subscribeOn(Schedulers.computation()) would fix the issue.

See: http://reactivex.io/documentation/scheduler.html for more about Schedulers

Upvotes: 2

Related Questions