lukstei
lukstei

Reputation: 886

RxJava async subscription

I have a list of tasks which should be handled one-by-one in a new thread and then the result should be displayed in a method by some main thread. However this doesn't seem to work, the flatMap method is invoked in the main thread.

Why does not the subscribeOn method handle the "thread switch" in this case?

What would be a better pattern to execute some work in another thread? (except from using Observable.create and creating a new thread manually, which is very verbose)

List<Task> tasks = ...;
Observable.from(tasks)
          .flatMap(task -> {
                // should be handled in a new thread
                try {
                    return Observable.just(task.call());
                } catch (Exception e) {
                    log.error("Error", e);
                }

                return Observable.empty();
            })
            .subscribeOn(Schedulers.newThread())
            .observeOn(MySchedulers.main())
            .subscribe(this::show); // subscribe called from main thread

Upvotes: 4

Views: 2178

Answers (1)

James World
James World

Reputation: 29776

Caveat: I'm not a Java programmer, but C#, so all the weird camel case method names freak me out and confuse me.

The subscribeOn is in the wrong place if you want a new thread for the flatMap operation. Insert it between from and flatMap. See this answer for a full explanation of subscribeOn and observeOn - it's written for .NET but the principles are the same.

I'm not familiar with tasks in Java, so I'm not sure if your Task is like .NET's Task and whether task.call() is asynchronous and launches its own thread - I guess not from your question since you said "...list of tasks which should be handled one-by-one in a new thread".

A newThread scheduler uses a new thread per subscriber - since flatMap will make a single subscription, all task.call invocations will be made on the same thread, distinct though that will be from the from operator's thread.

If task.call actually is asynchronous then the results will come back according to however it introduces concurrency and that will be independent of Rx's semantics.

Either way, the (correctly placed) observeOn will cause the results to be passed to this::show on the main thread.

Upvotes: 3

Related Questions