Prashant Pandey
Prashant Pandey

Reputation: 4642

Receiving items emitted from an Observable

Here's my code:

  public static void main(String[] args) {
    Observable.just("747", "737", "777")
        .flatMap(
            a -> {
              try {
                Thread.sleep(5000);
              } catch (InterruptedException e) {
                e.printStackTrace();
              }
              return Observable.just(a).subscribeOn(Schedulers.newThread());
            })
        .subscribe(p -> System.out.println("Received " + p + " on thread " + Thread.currentThread().getName()));
  }

As I have understood it, each of the items of the observable should will run in a separate thread (which does happen) and the results would be sent to the same thread that did the work (this happens as well). But what I cannot understand is that why is the main thread not exiting and waiting for the background threads to finish? The program continues for as long as each of the background threads is running.

Upvotes: 1

Views: 52

Answers (1)

Malt
Malt

Reputation: 30285

If you look at a thread dump, you'll see that the main thread is actually stuck on the sleep statement. That's why it's not exiting.

This is because it is the thread that's executing the flatMap operator, so it's getting stuck. This also is why the code takes a long time to run. You can easily verify it by inserting a print statement just before sleep:

try {
  System.out.println(Thread.currentThread().getName() + " is sleeping");
  Thread.sleep(5000);
} catch (InterruptedException e) {
  e.printStackTrace();
}

The output is something like this:

main is sleeping
main is sleeping
Received 747 on thread RxNewThreadScheduler-1
main is sleeping
Received 737 on thread RxNewThreadScheduler-2
Received 777 on thread RxNewThreadScheduler-3

I think you've meant to write something like this:

System.out.println(Thread.currentThread().getName()  + " is creating the observable");
Observable.just("747", "737", "777")
        .flatMap(a ->
                 Observable.fromCallable(() -> {
                  try {
                    System.out.println(Thread.currentThread().getName() + " is sleeping");
                    Thread.sleep(5000);
                  } catch (InterruptedException e) {
                    e.printStackTrace();
                  }
                  return a;
                }).subscribeOn(Schedulers.newThread())
      ).subscribe(p -> System.out.println("Received " + p + " on thread " + Thread.currentThread().getName()));
System.out.println(Thread.currentThread().getName() + " is going to exit");

Output:

main is creating the observable
main is going to exit
RxNewThreadScheduler-3 is sleeping
RxNewThreadScheduler-2 is sleeping
RxNewThreadScheduler-1 is sleeping
Received 777 on thread RxNewThreadScheduler-3
Received 747 on thread RxNewThreadScheduler-1
Received 737 on thread RxNewThreadScheduler-1

In this version main is exiting right after the Observable is created.

Upvotes: 1

Related Questions