wujek
wujek

Reputation: 11070

RxJava .subscribeOn(Schedulers.newThread()) questions

I am on plain JDK 8. I have this simple RxJava example:

Observable
.from(Arrays.asList("one", "two", "three"))
.doOnNext(word -> System.out.printf("%s uses thread %s%n", word, Thread.currentThread().getName()))
//.subscribeOn(Schedulers.newThread())
.subscribe(word -> System.out.println(word));

and it prints out the words line by line, intertwined with information about the thread, which is 'main' for all next calls, as expected.

However, when I uncomment the subscribeOn(Schedulers.newThread()) call, nothing is printed at all. Why isn't it working? I would have expected it to start a new thread for each onNext() call and the doOnNext() to print that thread's name. Right now, I see nothing, also for the other schedulers.

When I add the call to Thread.sleep(10000L) at the end of my main, I can see the output, which would suggest the threads used by RxJava are all daemons. Is this the case? Can this be changed somehow, but using a custom ThreadFactory or similar concept, and not have to implement a custom Scheduler?

With the mentioned change, the thread name is always RxNewThreadScheduler-1, whereas the documentation for newThread says "Scheduler that creates a new {@link Thread} for each unit of work". Isn't it supposed to create a new thread for all of the emissions?

Upvotes: 2

Views: 3675

Answers (3)

NickUnuchek
NickUnuchek

Reputation: 12897

public class MainClass {

    public static void main(String[] args) {

        Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(10, Executors.defaultThreadFactory()));

        Observable.interval(1,TimeUnit.SECONDS)
                .doOnNext(word -> System.out.printf("%s uses thread %s%n", word,
                        Thread.currentThread().getName()))
                .subscribeOn(scheduler)
                .observeOn(Schedulers.io())
                .doOnNext(word -> System.out.printf("%s uses thread %s%n", word,
                        Thread.currentThread().getName()))
                .subscribe();
    }

}

Upvotes: 1

softjake
softjake

Reputation: 1076

Contrary to newcomers belief, reactive streams are not inherently concurrent but are inherently asynchronous. They also are inherently sequential and concurrency must be configured within the stream. Put simply, reactive streams are naturally sequential at their ends but can be concurrent at their core.

The secret sauce is using the flatMap() operator within the stream. This operator takes an Observable<T> input from the source stream and, internally re-emit it as an Observable<Observable<T>> stream to which it subscribes too all instances at once. As long as the flatMap() internal stream is executed in a multi-threaded context, it will concurrently execute the provided Function<T, R> that applies your logic and, finally, re-emit the result on the original stream as it's own emissions.

This sounds very complicated (and it is quite a bit at first glance) but simple examples with explanations help to understand the concept.

Find more details from a similar question here and articles on RxJava2 Schedulers and Concurrency with code sample and detailed explanations on how to use Schedulers sequentially and concurrently.

Hope this helps,

Softjake

Upvotes: 1

akarnokd
akarnokd

Reputation: 70017

As Vladimir mentioned, RxJava standard schedulers run work on daemon threads which terminate in your example because the main thread quits. I'd like to emphasise that they don't schedule each value on a new thread, but they schedule the stream of values for each individual subscriber on a newly created thread. Subscribing a second time would give you "RxNewThreadScheduler-2".

You don't really need to change the default schedulers, but just wrap your own Executor-based scheduler with Schedulers.from() and supply that as a parameter where needed:

ThreadPoolExecutor exec = new ThreadPoolExecutor(
        0, 64, 2, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
exec.allowCoreThreadTimeOut(true);

Scheduler s = Schedulers.from(exec);

Observable
.from(Arrays.asList("one", "two", "three"))
.doOnNext(word -> System.out.printf("%s uses thread %s%n", word,
    Thread.currentThread().getName()))
.subscribeOn(s)
.subscribe(word -> System.out.println(word));

I've got a series of blog posts about RxJava schedulers whichs should help you implement a "more permanent" variant.

Upvotes: 6

Related Questions