Reputation: 30078
I wanted to test merge
operator using rxjava 1 using:
Observable<Object> o1 = Observable.create((t) -> {
t.onNext("1");
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
t.onNext("2");
t.onCompleted();
});
Observable<Object> o2 = Observable.create((t) -> {
t.onNext("a");
t.onNext("b");
t.onCompleted();
});
o1.mergeWith(o2).subscribe(System.out::println);
I expect the output to be: 1ab2
but the actual is 12ab
.
So, why?
Upvotes: 0
Views: 109
Reputation: 25603
Because merge
subscribes to the Observables in a synchronous fashion. And since the first Observable does not explicitly move work to a background thread the body of create(...)
is also executed synchronously.
This means that when subscribing to o1
it has to fully complete the body in the create(...)
call before it subscribes to the next observable.
Explicitly making the observables run in the background using subscribeOn(Schedulers.computation())
would fix the issue.
See: http://reactivex.io/documentation/scheduler.html for more about Schedulers
Upvotes: 2