DeBe
DeBe

Reputation: 501

RxJava: subscribeOn and observeOn not working as expected

maybe I just really understand the inner workings of subscribeOn and observeOn, but I recently encountered something really odd. I was under the impression, that subscribeOn determines the Scheduler where to initially start processing (especially when we, e.g., have a lot of maps which change the stream of data) and then observeOn can be used anywhere between those maps to change Schedulers when appropriate (first do networking, then computation, finally change UI thread).

However, I noticed that when not directly chaining those calls to my Observable or Single, it won't work. Here's a minimal working Example JUnit Test:

import org.junit.Test;
import rx.Single;
import rx.schedulers.Schedulers;

public class SubscribeOnTest {

  @Test public void not_working_as_expected() throws Exception {
    Single<Integer> single = Single.<Integer>create(singleSubscriber -> {
      System.out.println("Doing some computation on thread " + Thread.currentThread().getName());
      int i = 1;
      singleSubscriber.onSuccess(i);
    });
    single.subscribeOn(Schedulers.computation()).observeOn(Schedulers.io());

    single.subscribe(integer -> {
      System.out.println("Observing on thread " + Thread.currentThread().getName());
    });
    System.out.println("Doing test on thread " + Thread.currentThread().getName());
    Thread.sleep(1000);
  }

  @Test public void working_as_expected() throws Exception {
    Single<Integer> single = Single.<Integer>create(singleSubscriber -> {
      System.out.println("Doing some computation on thread " + Thread.currentThread().getName());
      int i = 1;
      singleSubscriber.onSuccess(i);
    }).subscribeOn(Schedulers.computation()).observeOn(Schedulers.io());

    single.subscribe(integer -> {
      System.out.println("Observing on thread " + Thread.currentThread().getName());
    });
    System.out.println("Doing test on thread " + Thread.currentThread().getName());
    Thread.sleep(1000);
  }
}

The test not_working_as_expected() gives me following output

Doing some computation on thread main
Observing on thread main
Doing test on thread main

whereas working_as_expected() gives me

Doing some computation on thread RxComputationScheduler-1
Doing test on thread main
Observing on thread RxIoScheduler-2

The only difference being that in the first test, after the creation of the single there is a semicolon and only then the schedulers are applied, and in the working example the method calls are directly chained to the creation of the Single. But shouldn't that be irrelevant?

Upvotes: 2

Views: 3309

Answers (2)

koperko
koperko

Reputation: 2487

All "modifications" performed by operators are immutable, meaning that they return a new stream that receives notifications in an altered manner from the previous one. Since you just called subscribeOn and observeOn operators and didn't store their result, the subscription made later is on the unaltered stream.

One side note: I didn't quite understand your definition of subscribeOn behavior. If you meant that map operators are somehow affected by it, this is not true. subscribeOn defines a Scheduler, on which the OnSubscribe function is called. In your case the function you pass to the create() method. On the other hand, observeOn defines the Scheduler on which each successive stream (streams returned by applied operators) is handling emissions coming from an upstream.

Upvotes: 3

Divers
Divers

Reputation: 9569

.subscribeOn(*) - returns you new instance of Observable, but in first test you just ignore that and then subscribe on original Observable, which obviously by default subscribes on default, main thread.

Upvotes: 3

Related Questions