Reputation: 886
I have a list of tasks which should be handled one-by-one in a new thread and then the result should be displayed in a method by some main thread.
However this doesn't seem to work, the flatMap
method is invoked in the main thread.
Why does not the subscribeOn
method handle the "thread switch" in this case?
What would be a better pattern to execute some work in another thread? (except from using Observable.create
and creating a new thread manually, which is very verbose)
List<Task> tasks = ...;
Observable.from(tasks)
.flatMap(task -> {
// should be handled in a new thread
try {
return Observable.just(task.call());
} catch (Exception e) {
log.error("Error", e);
}
return Observable.empty();
})
.subscribeOn(Schedulers.newThread())
.observeOn(MySchedulers.main())
.subscribe(this::show); // subscribe called from main thread
Upvotes: 4
Views: 2178
Reputation: 29776
Caveat: I'm not a Java programmer, but C#, so all the weird camel case method names freak me out and confuse me.
The subscribeOn
is in the wrong place if you want a new thread for the flatMap
operation. Insert it between from
and flatMap
. See this answer for a full explanation of subscribeOn
and observeOn
- it's written for .NET but the principles are the same.
I'm not familiar with tasks in Java, so I'm not sure if your Task
is like .NET's Task
and whether task.call()
is asynchronous and launches its own thread - I guess not from your question since you said "...list of tasks which should be handled one-by-one in a new thread".
A newThread
scheduler uses a new thread per subscriber - since flatMap
will make a single subscription, all task.call
invocations will be made on the same thread, distinct though that will be from the from
operator's thread.
If task.call
actually is asynchronous then the results will come back according to however it introduces concurrency and that will be independent of Rx's semantics.
Either way, the (correctly placed) observeOn
will cause the results to be passed to this::show
on the main thread.
Upvotes: 3