jstnchng
jstnchng

Reputation: 2221

How to subscribe in rxJava to a different thread pool

I'm running this bit in my program right now

Observable.from(constituents.entrySet()).subscribeOn(Schedulers.newThread())
        .flatMap(Async.toAsync((Map.Entry<String, ConstituentInfo> entry) -> {
            logger.info(Thread.currentThread().getName());
            ConstituentInfo constituent = entry.getValue();

            String securityBySymbol = Redux.getSecurityBySymbol(entry.getKey());

            String company = UtilityMethods.getNestedJsonObject(securityBySymbol, "company");
            Integer compId = UtilityMethods.getIntegerFromJsonObject(company, "id");
            String companyName = UtilityMethods.getStringFromJsonObject(company, "name");
            String tier = UtilityMethods.getNestedJsonObject(securityBySymbol, "tier");
            String tierId = UtilityMethods.getStringFromJsonObject(tier, "id");
            String marketPlace = UtilityMethods.getStringFromJsonObject(tier, "name");
            String countryName = getCountryName(compId);

            constituent.setCompanyName(StringUtils.isBlank(companyName) ? NA : companyName);
            constituent.setMarketPlace(StringUtils.isBlank(marketPlace) ? NA : marketPlace);
            constituent.setCountryName(StringUtils.isBlank(countryName) ? NA : countryName);
            constituent.setTierId(StringUtils.isBlank(tierId) ? NA : tierId);

            return constituent;
        })).subscribeOn(Schedulers.newThread())
        .toList()
        .timeout(30, TimeUnit.MINUTES)
        .toBlocking()
        .single();

and it runs concurrently, but it runs on the RxComputationThreadPool. I was wondering how to make it run on Schedulers.newThread(), and if it would improve performance.

Alternatively, if it won't improve performance, is there a way to make the below code run any faster?

Upvotes: 0

Views: 103

Answers (1)

akarnokd
akarnokd

Reputation: 69997

There is an overload of toAsync which takes a Scheduler and you don't need subscribeOn. The computation() scheduler is the lowest latency scheduler from all. io() is likely and newThread() surely starts a new thread and thus can take several hundred microseconds to execute the first task but they are well suited to blocking I/O or network calls where this latency doesn't really matter.

Upvotes: 1

Related Questions