paul
paul

Reputation: 13471

SubscribeOn and observeOn in the main thread

I´m trying to use subscribeOn and obsereOn with an Executor to allow me back to the main thread once the async task finish. I end up with this code but it does not work

@Test
    public void testBackToMainThread() throws InterruptedException {
        processValue(1);
        processValue(2);
        processValue(3);
        processValue(4);
        processValue(5);
//        while (tasks.size() != 0) {
//            tasks.take().run();
//        }
        System.out.println("done");
    }

    private LinkedBlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();


    private void processValue(int value) throws InterruptedException {
        Observable.just(value)
                .subscribeOn(Schedulers.io())
                .doOnNext(number -> processExecution())
                .observeOn(Schedulers.from(command -> tasks.add(command)))
                .subscribe(x -> System.out.println("Thread:" + Thread.currentThread().getName() + " value:" + x));
        tasks.take().run();
    }

    private void processExecution() {
        System.out.println("Execution in " + Thread.currentThread().getName());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

Any idea how to accomplish what I want?

When I run I only printing

Execution in RxIoScheduler-2
Execution in RxIoScheduler-3
Execution in RxIoScheduler-4
Execution in RxIoScheduler-5
Execution in RxIoScheduler-6
done

Regards

Upvotes: 0

Views: 1325

Answers (3)

paul
paul

Reputation: 13471

I just update my code with suggestion of akanord but this aproach it seems to block one task to the other, and just end up running sequential.

With the code:

   @Test
    public void testBackToMainThread() throws InterruptedException {
        processValue(1);
        processValue(2);
        processValue(3);
        processValue(4);
        processValue(5);
        System.out.println("done");
    }

    private void processValue(int value) throws InterruptedException {
        BlockingScheduler scheduler = new BlockingScheduler();
        scheduler.execute(() -> Flowable.just(value)
                .subscribeOn(Schedulers.io())
                .doOnNext(number -> processExecution())
                .observeOn(scheduler)
                .doAfterTerminate(() -> scheduler.shutdown())
                .subscribe(v -> System.out.println(v + " on " + Thread.currentThread())));
    }

    private void processExecution() {
        System.out.println("Execution in " + Thread.currentThread().getName());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

And the output

Execution in RxCachedThreadScheduler-1
1 on Thread[main,5,main]
Execution in RxCachedThreadScheduler-1
2 on Thread[main,5,main]
Execution in RxCachedThreadScheduler-1
3 on Thread[main,5,main]
Execution in RxCachedThreadScheduler-1
4 on Thread[main,5,main]
Execution in RxCachedThreadScheduler-1
5 on Thread[main,5,main]
done

What I want to achieve is this output

Execution in RxCachedThreadScheduler-1
Execution in RxCachedThreadScheduler-1
Execution in RxCachedThreadScheduler-1
Execution in RxCachedThreadScheduler-1
Execution in RxCachedThreadScheduler-1
1 on Thread[main,5,main]
2 on Thread[main,5,main]
3 on Thread[main,5,main]
4 on Thread[main,5,main]
5 on Thread[main,5,main]
done

So every time the main thread run the pipeline run the onNext in another thread and then it return from the method until the another thread finish and make it the main thread back to the pipeline.

Upvotes: 0

akarnokd
akarnokd

Reputation: 69997

The problem with your approach is that you can't know how many tasks should be executed at a given time and also not deadlock on waiting for tasks that should happen after you unblock the main thread.

Returning to the Java main thread is not supported by any extension to 1.x I know. For 2.x, there is the BlockingScheduler from the extensions project that allows you to do that:

public static void main(String[] args) {
    BlockingScheduler scheduler = new BlockingScheduler();

    scheduler.execute(() -> {
        Flowable.range(1,10)
        .subscribeOn(Schedulers.io())
        .observeOn(scheduler)
        .doAfterTerminate(() -> scheduler.shutdown())
        .subscribe(v -> System.out.println(v + " on " + Thread.currentThread()));
    });

    System.out.println("BlockingScheduler finished");
}

Note the call to scheduler.shutdown() which has to be called eventually to release the main thread, otherwise your program may never terminate.

Upvotes: 1

Dean Xu
Dean Xu

Reputation: 4681

Your question will not happen in RxJava2. It's recommanded to use RxJava2.

I compared RxJava-1.2.7 and RxJava-2.0.7 and found the root cause. And now I am looking for the solution.

In RxJava-1.2.7.You can see ObservableObserveOn#145 and find it schedule the task when you call request. It means it will call Executor.execute when you subscribe on it. So your task queue accept the Runnable immediately. And then you take and run the Runnable (which is actual ExecutorSchedulerWorker) but the upstream's onNext haven't been called (because you sleep 2000ms). It will return null on ObserveOnSubscriber#213. When upstream call onNext(Integer), the task will never be run.

Upvotes: 1

Related Questions