Reputation: 501
maybe I just really understand the inner workings of subscribeOn
and observeOn
, but I recently encountered something really odd. I was under the impression, that subscribeOn
determines the Scheduler where to initially start processing (especially when we, e.g., have a lot of map
s which change the stream of data) and then observeOn
can be used anywhere between those maps
to change Schedulers when appropriate (first do networking, then computation, finally change UI thread).
However, I noticed that when not directly chaining those calls to my Observable or Single, it won't work. Here's a minimal working Example JUnit Test:
import org.junit.Test;
import rx.Single;
import rx.schedulers.Schedulers;
public class SubscribeOnTest {
@Test public void not_working_as_expected() throws Exception {
Single<Integer> single = Single.<Integer>create(singleSubscriber -> {
System.out.println("Doing some computation on thread " + Thread.currentThread().getName());
int i = 1;
singleSubscriber.onSuccess(i);
});
single.subscribeOn(Schedulers.computation()).observeOn(Schedulers.io());
single.subscribe(integer -> {
System.out.println("Observing on thread " + Thread.currentThread().getName());
});
System.out.println("Doing test on thread " + Thread.currentThread().getName());
Thread.sleep(1000);
}
@Test public void working_as_expected() throws Exception {
Single<Integer> single = Single.<Integer>create(singleSubscriber -> {
System.out.println("Doing some computation on thread " + Thread.currentThread().getName());
int i = 1;
singleSubscriber.onSuccess(i);
}).subscribeOn(Schedulers.computation()).observeOn(Schedulers.io());
single.subscribe(integer -> {
System.out.println("Observing on thread " + Thread.currentThread().getName());
});
System.out.println("Doing test on thread " + Thread.currentThread().getName());
Thread.sleep(1000);
}
}
The test not_working_as_expected()
gives me following output
Doing some computation on thread main
Observing on thread main
Doing test on thread main
whereas working_as_expected()
gives me
Doing some computation on thread RxComputationScheduler-1
Doing test on thread main
Observing on thread RxIoScheduler-2
The only difference being that in the first test, after the creation of the single there is a semicolon and only then the schedulers are applied, and in the working example the method calls are directly chained to the creation of the Single. But shouldn't that be irrelevant?
Upvotes: 2
Views: 3309
Reputation: 2487
All "modifications" performed by operators are immutable, meaning that they return a new stream that receives notifications in an altered manner from the previous one. Since you just called subscribeOn
and observeOn
operators and didn't store their result, the subscription made later is on the unaltered stream.
One side note: I didn't quite understand your definition of subscribeOn
behavior. If you meant that map operators are somehow affected by it, this is not true. subscribeOn
defines a Scheduler, on which the OnSubscribe function is called. In your case the function you pass to the create()
method. On the other hand, observeOn
defines the Scheduler on which each successive stream (streams returned by applied operators) is handling emissions coming from an upstream.
Upvotes: 3
Reputation: 9569
.subscribeOn(*)
- returns you new instance of Observable
, but in first test you just ignore that and then subscribe on original Observable
, which obviously by default subscribes on default, main thread.
Upvotes: 3