Reputation: 4642
Here's my code:
public static void main(String[] args) {
Observable.just("747", "737", "777")
.flatMap(
a -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Observable.just(a).subscribeOn(Schedulers.newThread());
})
.subscribe(p -> System.out.println("Received " + p + " on thread " + Thread.currentThread().getName()));
}
As I have understood it, each of the items of the observable should will run in a separate thread (which does happen) and the results would be sent to the same thread that did the work (this happens as well). But what I cannot understand is that why is the main
thread not exiting and waiting for the background threads to finish? The program continues for as long as each of the background threads is running.
Upvotes: 1
Views: 52
Reputation: 30285
If you look at a thread dump, you'll see that the main
thread is actually stuck on the sleep
statement. That's why it's not exiting.
This is because it is the thread that's executing the flatMap
operator, so it's getting stuck. This also is why the code takes a long time to run. You can easily verify it by inserting a print statement just before sleep
:
try {
System.out.println(Thread.currentThread().getName() + " is sleeping");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
The output is something like this:
main is sleeping
main is sleeping
Received 747 on thread RxNewThreadScheduler-1
main is sleeping
Received 737 on thread RxNewThreadScheduler-2
Received 777 on thread RxNewThreadScheduler-3
I think you've meant to write something like this:
System.out.println(Thread.currentThread().getName() + " is creating the observable");
Observable.just("747", "737", "777")
.flatMap(a ->
Observable.fromCallable(() -> {
try {
System.out.println(Thread.currentThread().getName() + " is sleeping");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return a;
}).subscribeOn(Schedulers.newThread())
).subscribe(p -> System.out.println("Received " + p + " on thread " + Thread.currentThread().getName()));
System.out.println(Thread.currentThread().getName() + " is going to exit");
Output:
main is creating the observable
main is going to exit
RxNewThreadScheduler-3 is sleeping
RxNewThreadScheduler-2 is sleeping
RxNewThreadScheduler-1 is sleeping
Received 777 on thread RxNewThreadScheduler-3
Received 747 on thread RxNewThreadScheduler-1
Received 737 on thread RxNewThreadScheduler-1
In this version main
is exiting right after the Observable is created.
Upvotes: 1