user2455862
user2455862

Reputation: 678

RX Java - different behaviour of subscribeOn and observeOn

I'm new to RX java and I've been experimenting with observeOn and subscribeOn methods. I've read that the difference between them is that they affect the whole chain(subscribeOn) or only the part of the chain after setting a scheduler(observeOn). So why the code below executes fine (prints the current thread):

Observable obs = Observable.from(Arrays.asList("element", "2nd element"));
obs.observeOn(Schedulers.newThread())
                .map(x -> x.toString().toUpperCase())
                .subscribe(x -> System.out.println("NT:" + Thread.currentThread().getName() + x));

while this code doesn't print anything:

Observable obs = Observable.from(Arrays.asList("element", "2nd element"));
obs.subscribeOn(Schedulers.newThread())
            .map(x -> x.toString().toUpperCase())
            .subscribe(x -> System.out.println("NT:" + Thread.currentThread().getName() + x));

Upvotes: 4

Views: 148

Answers (2)

Maxim Volgin
Maxim Volgin

Reputation: 4077

Basically, an observable (more specifically, a cold observable) is a delayed calculation. So, .subscribeOn() defined on which scheduler/thread this calculation will be done, and .observeOn() defines on which scheduler/thread you will receive its result. For instance, network request should run on Schedulers.io which you specify by adding .subscribeOn(Schedulers.io) and you want to display results on main thread by specifying .observeOn(AndroidShedulers.main)

Upvotes: 0

Peter Samokhin
Peter Samokhin

Reputation: 874

Are you sure that this code doesn't print anything?

I tried this code:

Observable obs = Observable.from(Arrays.asList("element", "2nd element"));
obs.subscribeOn(Schedulers.newThread())
  .map(x -> x.toString().toUpperCase())
  .subscribe(x -> System.out.println("NT:" + Thread.currentThread().getName() + x));

Thread.sleep(5000);

Output:

NT:RxNewThreadScheduler-1ELEMENT
NT:RxNewThreadScheduler-12ND ELEMENT

Maybe you forgot to sleep or do some another work to make application wait for completion of new RxJava threads?

Upvotes: 1

Related Questions