Reputation: 31
I am trying to call REST API parallel using Observables. I am facing some issue on this. I described problem here. could some one help me on this ?
My use case is, I have 5 customerid, i need to invoke another REST API by passing this user id concurrently to get the response.
so i decided to use Observables here for better performance to access the service concurrently.
I tried with 3 below option. but i feel, all of them provide them same response time. I don't see any difference in response time for all this calls
Can some one find mistakes in this code where it went wrong. whether i have used Observables correctly?
1) Observable.from(customerIds).flatMap(customerId ->
asyncUserRetrieve(customerId)
.subscribeOn(Schedulers.io()))
.subscribe(cust -> {
custDetails.add(cust);
});
2) Observable.from(customerIds).flatMap(customerId ->
asyncUserRetrieve(customerId)
.subscribe(cust -> {
custDetails.add(cust);
});
3) for(String id : customerId) {
Customer customer = asyncUserRetrieve(id).toBlocking().single();.
custDetails.add(cust);
}
@Override
public Observable<Customer> asyncUserRetrieve(String customerId) {
final URI uri = getURL(customerId);
final Response response = httpClient.callForResponse(uri);
if (response.getHttpStatus().is2xxSuccessful()) {
Customer customer = getResponse(response, customerId);
return Observable.just(customer);
}
return Observable.just(new Customer().setError(true));
}
Upvotes: 1
Views: 2648
Reputation: 3494
The problem is in the asyncUserRetrieve
implementation.
So the things happen when asyncUserRetreive
is called are:
Instead, the flow should be something like this:
One possible implementation would be:
@Override
public Observable<Customer> asyncUserRetrieve(String customerId) {
return Observable.fromCallable(() -> {
final URI uri = getURL(customerId);
final Response response = httpClient.callForResponse(uri);
if (response.getHttpStatus().is2xxSuccessful()) {
Customer customer = getResponse(response, customerId);
return customer;
}
return new Customer().setError(true);
}
}
Update:
If you want your current thread to wait until all response comes, try doing something like this:
RxJava 1
List<Customer> tempCustomers = Observable.from(customerIds)
.flatMap(customerId -> {
asyncUserRetrieve(customerId)
.subscribeOn(Schedulers.io())
})
.toList()
.toBlocking() // Do this only if you want to block the thread.
.single()
custDetails.addAll(tempCustomers);
RxJava 2
List<Customer> tempCustomers = Observable.fromIterable(customerIds)
.flatMap(customerId -> {
asyncUserRetrieve(customerId)
.subscribeOn(Schedulers.io())
})
.toList()
.blockingGet()
custDetails.addAll(tempCustomers);
Upvotes: 1