Reputation: 7991
I've a list of integers and for each integer, I want to fetch a string value from database, and put it into a Map where Integer is the int value from the parent list, and String is the string value fetched from database. I also want, fetching the strings to run in parallel.
This is how I've implemented it with RxJava. My code works and I'm getting the expected result, but I don't think fetching names are running in parallel.
public Observable<Map<Integer, String>> getVitalsForConnectedDevices(int accountId) {
List<Integer> ids = Lists.newArrayList(2, 3, 5);
return Observable<Map<Integer, String>> obs = Observable.from(ids)
.flatMap((Func1<Integer, Observable<Map.Entry<Integer, String>>>) integer
-> Observable.just(Maps.immutableEntry(integer, deviceDAO.getVitalName(integer)))
.subscribeOn(Schedulers.io()), 20)
.toMap(entry -> entry.getKey(), entry -> entry.getValue());
}
Here's the getVitalName() methods
public String getVitalName(int vitalId) {
log.debug("id: " + vitalId);
String query = "SELECT name FROM vitals WHERE vital_id=?";
String name = v1JdbcTemplate.queryForObject(query, String.class, vitalId);
log.debug("name: " + name);
return name;
}
its printing the debug statements from the above function in this order:
09-10-2017 02:05:37 DEBUG DeviceDAO:118 - id: 2
09-10-2017 02:05:37 DEBUG DeviceDAO:121 - name: Steps
09-10-2017 02:05:37 DEBUG DeviceDAO:118 - id: 3
09-10-2017 02:05:37 DEBUG DeviceDAO:121 - name: Floors
09-10-2017 02:05:37 DEBUG DeviceDAO:118 - id: 5
09-10-2017 02:05:37 DEBUG DeviceDAO:121 - name: Distance
If it was running parallel, shouldn't it be printing all the id's first and names later? What am I doing wrong here? How do I get them to run in parallel?
Upvotes: 0
Views: 1141
Reputation: 4002
shouldn't it be printing all the id's first and names later
The answer is no, because you are requesting for each id a String from database. When the result is finished, you package the Id and the String into an Entry and push it down the pipeline. Please note that toMap will only finish successfully, if all flatMaps finish or an error is emitted. So it could be, that you will wait quite a while for the final result.
As an Example: You iterate over 5 ids. All requests are fired in parallel. The server is in distress because of load, so some requests will take some time. Lets say one all requests are finished but one. Only when the last request has finished, the result Map<> will be pushed to subscriber. If this is what you want, then I would suggest to return a Single<> not an Observable.
Here is a test with output:
@Test
void name() throws Exception {
Observable<Tuple2<Integer, String>> tuple2Observable = Observable.just(1, 2, 3, 4, 5, 6)
.flatMap(integer ->
Observable.fromCallable(() -> getVitalName(integer))
.subscribeOn(Schedulers.io())
.doOnNext(s -> System.out.println("Value:: " + Thread.currentThread().getName() + "-" + Instant.now()))
.map(s -> Tuple.of(integer, s))
).doOnComplete(() -> System.out.println("Finished:: " + Thread.currentThread().getName() + "-" + Instant.now()));
tuple2Observable.test()
.await();
}
public String getVitalName(int vitalId) throws Exception {
System.out.println("getVitalName method called with vitalId = " + vitalId + "-" + Thread.currentThread().getName() + "-" + Instant.now());
Thread.sleep(500);
String name = "le fake value";
return name;
}
You see, that indeed the calles were made concurrently on different threads and that the observable finished when all requests have finished.
getVitalName method called with vitalId = 4-RxCachedThreadScheduler-4-2017-10-08T21:20:43.785Z
getVitalName method called with vitalId = 6-RxCachedThreadScheduler-6-2017-10-08T21:20:43.786Z
getVitalName method called with vitalId = 5-RxCachedThreadScheduler-5-2017-10-08T21:20:43.785Z
getVitalName method called with vitalId = 1-RxCachedThreadScheduler-1-2017-10-08T21:20:43.785Z
getVitalName method called with vitalId = 2-RxCachedThreadScheduler-2-2017-10-08T21:20:43.784Z
getVitalName method called with vitalId = 3-RxCachedThreadScheduler-3-2017-10-08T21:20:43.787Z
Value:: RxCachedThreadScheduler-4-2017-10-08T21:20:44.303Z
Value:: RxCachedThreadScheduler-6-2017-10-08T21:20:44.303Z
Value:: RxCachedThreadScheduler-1-2017-10-08T21:20:44.304Z
Value:: RxCachedThreadScheduler-2-2017-10-08T21:20:44.304Z
Value:: RxCachedThreadScheduler-3-2017-10-08T21:20:44.304Z
Value:: RxCachedThreadScheduler-5-2017-10-08T21:20:44.304Z
Finished:: RxCachedThreadScheduler-4-2017-10-08T21:20:44.317Z
Upvotes: 1