1048576
1048576

Reputation: 715

subscribeOn main thread and observerOn caller thread RxJava2

I'm wrapping a legacy code for a library that is not thread-safe. When the client of the library calls an API off the main thread I need to switch to the main thread and then switch back to the caller thread to return the results

I thought I could use (this is for Android but the question is more generic)

internal object TransformCompletableTemporarilySwitchToMainThread : CompletableTransformer {
    override fun apply(upstream: Completable): CompletableSource {
        return upstream
                .observeOn(Schedulers.trampoline())
                .subscribeOn(AndroidSchedulers.mainThread())
    }
}

Is there something like Schedulers.immediate() of RxJava1? I know that for testing you can replace Schedulers.immediate() with Schedulers.trampoline(), but from the documentation and from the tests I run it looks like Schedulers.trampoline() hasn't got much to do with Schedulers.immediate() Is there an alternative way to do it?

ADDED

   /**
 * Returns a default, shared {@link Scheduler} instance whose {@link io.reactivex.Scheduler.Worker}
 * instances queue work and execute them in a FIFO manner on one of the participating threads.
 * <p>
 * The default implementation's {@link Scheduler#scheduleDirect(Runnable)} methods execute the tasks on the current thread
 * without any queueing and the timed overloads use blocking sleep as well.
 * <p>
 * Note that this scheduler can't be reliably used to return the execution of
 * tasks to the "main" thread. Such behavior requires a blocking-queueing scheduler currently not provided
 * by RxJava itself but may be found in external libraries.
 * <p>
 * This scheduler can't be overridden via an {@link RxJavaPlugins} method.
 * @return a {@link Scheduler} that queues work on the current thread
 */

what do these two parts mean?

  • Returns a default, shared {@link Scheduler} instance whose {@link io.reactivex.Scheduler.Worker} * instances queue work and execute them in a FIFO manner on one of the participating threads.

and

  • @return a {@link Scheduler} that queues work on the current thread

Upvotes: 0

Views: 624

Answers (1)

akarnokd
akarnokd

Reputation: 69997

The immediate or trampoline schedulers are not suitable for returning to a specific thread. You need a single-threaded scheduler for that which you can create from an ExecutorService:

ExecutorService exec = Executors.newSingleThreadedExecutor();
Scheduler singleThreaded = Schedulers.from(exec);

Observable.fromCallable(() -> api.init())
.subscribeOn(singleThreaded)
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(v -> System.out.println("Initialized"))
.observeOn(singleThreaded)
.map(v -> api.getData(v))
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(v -> System.out.println("Some data: "))
.observeOn(singleThreaded)
.doOnNext(v -> api.close())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(v -> System.out.println("Done"))
;

// later
exec.shutdown();

Edit::

It is not possible to return to an arbitrary thread. However, if a thread has a looper/handler, then you can turn that into a scheduler with AndroidSchedulers and target it via observeOn.

Upvotes: 2

Related Questions