Reputation: 13471
I´m trying to use subscribeOn and obsereOn with an Executor to allow me back to the main thread once the async task finish. I end up with this code but it does not work
@Test
public void testBackToMainThread() throws InterruptedException {
processValue(1);
processValue(2);
processValue(3);
processValue(4);
processValue(5);
// while (tasks.size() != 0) {
// tasks.take().run();
// }
System.out.println("done");
}
private LinkedBlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
private void processValue(int value) throws InterruptedException {
Observable.just(value)
.subscribeOn(Schedulers.io())
.doOnNext(number -> processExecution())
.observeOn(Schedulers.from(command -> tasks.add(command)))
.subscribe(x -> System.out.println("Thread:" + Thread.currentThread().getName() + " value:" + x));
tasks.take().run();
}
private void processExecution() {
System.out.println("Execution in " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Any idea how to accomplish what I want?
When I run I only printing
Execution in RxIoScheduler-2
Execution in RxIoScheduler-3
Execution in RxIoScheduler-4
Execution in RxIoScheduler-5
Execution in RxIoScheduler-6
done
Regards
Upvotes: 0
Views: 1325
Reputation: 13471
I just update my code with suggestion of akanord but this aproach it seems to block one task to the other, and just end up running sequential.
With the code:
@Test
public void testBackToMainThread() throws InterruptedException {
processValue(1);
processValue(2);
processValue(3);
processValue(4);
processValue(5);
System.out.println("done");
}
private void processValue(int value) throws InterruptedException {
BlockingScheduler scheduler = new BlockingScheduler();
scheduler.execute(() -> Flowable.just(value)
.subscribeOn(Schedulers.io())
.doOnNext(number -> processExecution())
.observeOn(scheduler)
.doAfterTerminate(() -> scheduler.shutdown())
.subscribe(v -> System.out.println(v + " on " + Thread.currentThread())));
}
private void processExecution() {
System.out.println("Execution in " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
And the output
Execution in RxCachedThreadScheduler-1
1 on Thread[main,5,main]
Execution in RxCachedThreadScheduler-1
2 on Thread[main,5,main]
Execution in RxCachedThreadScheduler-1
3 on Thread[main,5,main]
Execution in RxCachedThreadScheduler-1
4 on Thread[main,5,main]
Execution in RxCachedThreadScheduler-1
5 on Thread[main,5,main]
done
What I want to achieve is this output
Execution in RxCachedThreadScheduler-1
Execution in RxCachedThreadScheduler-1
Execution in RxCachedThreadScheduler-1
Execution in RxCachedThreadScheduler-1
Execution in RxCachedThreadScheduler-1
1 on Thread[main,5,main]
2 on Thread[main,5,main]
3 on Thread[main,5,main]
4 on Thread[main,5,main]
5 on Thread[main,5,main]
done
So every time the main thread run the pipeline run the onNext in another thread and then it return from the method until the another thread finish and make it the main thread back to the pipeline.
Upvotes: 0
Reputation: 69997
The problem with your approach is that you can't know how many tasks should be executed at a given time and also not deadlock on waiting for tasks that should happen after you unblock the main thread.
Returning to the Java main thread is not supported by any extension to 1.x I know. For 2.x, there is the BlockingScheduler from the extensions project that allows you to do that:
public static void main(String[] args) {
BlockingScheduler scheduler = new BlockingScheduler();
scheduler.execute(() -> {
Flowable.range(1,10)
.subscribeOn(Schedulers.io())
.observeOn(scheduler)
.doAfterTerminate(() -> scheduler.shutdown())
.subscribe(v -> System.out.println(v + " on " + Thread.currentThread()));
});
System.out.println("BlockingScheduler finished");
}
Note the call to scheduler.shutdown()
which has to be called eventually to release the main thread, otherwise your program may never terminate.
Upvotes: 1
Reputation: 4681
Your question will not happen in RxJava2. It's recommanded to use RxJava2.
I compared RxJava-1.2.7 and RxJava-2.0.7 and found the root cause. And now I am looking for the solution.
In RxJava-1.2.7.You can see ObservableObserveOn#145
and find it schedule
the task when you call request
. It means it will call Executor.execute
when you subscribe on it. So your task queue accept the Runnable
immediately. And then you take and run the Runnable
(which is actual ExecutorSchedulerWorker
) but the upstream's onNext
haven't been called (because you sleep 2000ms). It will return null
on ObserveOnSubscriber#213
. When upstream call onNext(Integer)
, the task will never be run.
Upvotes: 1