bcoughlan
bcoughlan

Reputation: 26617

rxjava2 - simple example of executing tasks on a thread pool, subscribing on a single thread

I'm experimenting with the following task to get my head around RxJava:

So I tried it out in Kotlin:

val ex = Executors.newFixedThreadPool(10)
Observable.fromIterable((1..100).toList())
    .observeOn(Schedulers.from(ex))
    .map { Thread.currentThread().name }
    .subscribe { println(it + " " + Thread.currentThread().name }

I expected it to print

pool-1-thread-1 main
pool-1-thread-2 main
pool-1-thread-3 main
pool-1-thread-4 main
....

However it prints:

pool-1-thread-1 pool-1-thread-1
pool-1-thread-1 pool-1-thread-1
pool-1-thread-1 pool-1-thread-1

Can anyone correct my misunderstandings about how this works? Why does it not use all of the threads of the thread pool? How can I get my subscriber to run on the main thread or block until completion?

Upvotes: 4

Views: 1542

Answers (2)

gildor
gildor

Reputation: 1894

To make code inside your map block work in parallel you should wrap it to observable with own scheduler:

val ex = Executors.newFixedThreadPool(10)
    val scheduler = Schedulers.from(ex)
    Observable.fromIterable((1..100).toList())
            .flatMap {
                Observable
                        .fromCallable { Thread.currentThread().name }
                        .subscribeOn(scheduler)
            }
            .subscribe { println(it + " " + Thread.currentThread().name) }

In this case, you will see the result:

pool-1-thread-1 pool-1-thread-1
pool-1-thread-2 pool-1-thread-1
pool-1-thread-3 pool-1-thread-1
pool-1-thread-4 pool-1-thread-1
...

You can check article RxJava - Achieving Parallelization that gives explanations of this behavior.

Also, RxJava 2.0.5 introduced ParallelFlowable API

Upvotes: 2

nhaarman
nhaarman

Reputation: 100358

Rx is not meant as a parallel execution service, use Java's streams api for that. Rx events are synchronous, and will flow through the stream subsequently. When building the stream, observeOn will request a thread once and process the emissions one by one on that thread.

You also expected subscribe to be executed on the main thread. observeOn switches the threads and all downstream events happen on that thread. If you want to switch to the main thread, you will have to insert another observeOn just before subscribe.

Upvotes: 5

Related Questions