iCodeLikeImDrunk
iCodeLikeImDrunk

Reputation: 17806

How come RxJava uses only ~10 threads when processing asynchronously?

Consider the following code, I am trying to get the Observables to run asynchronously.

    try {
        DateTime now = DateTime.now();
        Observable
                .from(map.entrySet()).subscribeOn(Schedulers.from(new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 10)))
                .flatMap(Async.toAsync((Map.Entry<String, Info> entry) -> {
                    // processing work, makes multiple http requests for ref data
                }))
                .doOnCompleted(() -> System.out.println("completed yo...."))
                .doOnError(Throwable::printStackTrace)
                .toList()
                .timeout(1, TimeUnit.MINUTES)
                .toBlocking()
                .single()
                ;

        logger.info(now.toString());
        logger.info(DateTime.now().toString());

        saveToFile(gson.toJson(setForRx));
    } catch (Exception e) {
        e.printStackTrace();
    }

The output shows that it uses the same ~10 threads to do processing, how can I increase that?

Sample output:

INFO  2015-06-29 15:11:20,524 [rxjava.ConcurrentRxJava] RxComputationThreadPool-3 
INFO  2015-06-29 15:11:20,526 [rxjava.ConcurrentRxJava] RxComputationThreadPool-6 
INFO  2015-06-29 15:11:20,542 [rxjava.ConcurrentRxJava] RxComputationThreadPool-4 
INFO  2015-06-29 15:11:20,546 [rxjava.ConcurrentRxJava] RxComputationThreadPool-7 
INFO  2015-06-29 15:11:20,571 [rxjava.ConcurrentRxJava] RxComputationThreadPool-2
INFO  2015-06-29 15:11:20,694 [rxjava.ConcurrentRxJava] RxComputationThreadPool-1 
INFO  2015-06-29 15:11:20,920 [rxjava.ConcurrentRxJava] RxComputationThreadPool-8
INFO  2015-06-29 15:11:21,035 [rxjava.ConcurrentRxJava] RxComputationThreadPool-7 
INFO  2015-06-29 15:11:21,039 [rxjava.ConcurrentRxJava] RxComputationThreadPool-4 
INFO  2015-06-29 15:11:21,055 [rxjava.ConcurrentRxJava] RxComputationThreadPool-5
INFO  2015-06-29 15:11:21,081 [rxjava.ConcurrentRxJava] RxComputationThreadPool-3 
INFO  2015-06-29 15:11:21,094 [rxjava.ConcurrentRxJava] RxComputationThreadPool-6 
INFO  2015-06-29 15:11:21,118 [rxjava.ConcurrentRxJava] RxComputationThreadPool-2 

In my executor version, using Runtime.getRuntime().availableProcessors() * 10, I get size 80 pool. Is that possible with RxJava?

Upvotes: 2

Views: 1031

Answers (1)

akarnokd
akarnokd

Reputation: 69997

The toAsync(), by default, runs on the computation() scheduler which has fixed number of threads. There is an overload which takes a scheduler so you should refactor the Schedulers.from(...) into a local variable and pass that variable to toAsync().

Upvotes: 4

Related Questions