Reputation: 2221
I'm running this bit in my program right now
Observable.from(constituents.entrySet()).subscribeOn(Schedulers.newThread())
.flatMap(Async.toAsync((Map.Entry<String, ConstituentInfo> entry) -> {
logger.info(Thread.currentThread().getName());
ConstituentInfo constituent = entry.getValue();
String securityBySymbol = Redux.getSecurityBySymbol(entry.getKey());
String company = UtilityMethods.getNestedJsonObject(securityBySymbol, "company");
Integer compId = UtilityMethods.getIntegerFromJsonObject(company, "id");
String companyName = UtilityMethods.getStringFromJsonObject(company, "name");
String tier = UtilityMethods.getNestedJsonObject(securityBySymbol, "tier");
String tierId = UtilityMethods.getStringFromJsonObject(tier, "id");
String marketPlace = UtilityMethods.getStringFromJsonObject(tier, "name");
String countryName = getCountryName(compId);
constituent.setCompanyName(StringUtils.isBlank(companyName) ? NA : companyName);
constituent.setMarketPlace(StringUtils.isBlank(marketPlace) ? NA : marketPlace);
constituent.setCountryName(StringUtils.isBlank(countryName) ? NA : countryName);
constituent.setTierId(StringUtils.isBlank(tierId) ? NA : tierId);
return constituent;
})).subscribeOn(Schedulers.newThread())
.toList()
.timeout(30, TimeUnit.MINUTES)
.toBlocking()
.single();
and it runs concurrently, but it runs on the RxComputationThreadPool. I was wondering how to make it run on Schedulers.newThread(), and if it would improve performance.
Alternatively, if it won't improve performance, is there a way to make the below code run any faster?
Upvotes: 0
Views: 103
Reputation: 69997
There is an overload of toAsync
which takes a Scheduler
and you don't need subscribeOn
. The computation()
scheduler is the lowest latency scheduler from all. io()
is likely and newThread()
surely starts a new thread and thus can take several hundred microseconds to execute the first task but they are well suited to blocking I/O or network calls where this latency doesn't really matter.
Upvotes: 1