Arun Kalvichandran
Arun Kalvichandran

Reputation: 31

Multiple REST API calls parallel using RX Observables

I am trying to call REST API parallel using Observables. I am facing some issue on this. I described problem here. could some one help me on this ?

My use case is, I have 5 customerid, i need to invoke another REST API by passing this user id concurrently to get the response.

so i decided to use Observables here for better performance to access the service concurrently.

I tried with 3 below option. but i feel, all of them provide them same response time. I don't see any difference in response time for all this calls

Can some one find mistakes in this code where it went wrong. whether i have used Observables correctly?

1) Observable.from(customerIds).flatMap(customerId -> 
                asyncUserRetrieve(customerId)
                .subscribeOn(Schedulers.io()))
                .subscribe(cust -> {
                        custDetails.add(cust);

                });

2) Observable.from(customerIds).flatMap(customerId -> 
                asyncUserRetrieve(customerId)
                .subscribe(cust -> {

                        custDetails.add(cust);
                });
3) for(String id : customerId) {
    Customer customer = asyncUserRetrieve(id).toBlocking().single();.
    custDetails.add(cust);
}

    @Override
    public Observable<Customer> asyncUserRetrieve(String customerId) {
            final URI uri = getURL(customerId);
            final Response response = httpClient.callForResponse(uri);
            if (response.getHttpStatus().is2xxSuccessful()) {
                Customer customer = getResponse(response, customerId);
                return Observable.just(customer);
            }
            return Observable.just(new Customer().setError(true));

        } 

Upvotes: 1

Views: 2648

Answers (1)

Sanlok Lee
Sanlok Lee

Reputation: 3494

The problem is in the asyncUserRetrieve implementation. So the things happen when asyncUserRetreive is called are:

  1. Calls server.
  2. Waits until server response comes. <- Thread is blocked
  3. Returns an Observable.
  4. Returned Observable emits immediately.


Instead, the flow should be something like this:

  1. Immediately returns an Observable that makes server request
  2. Returned observable emits when server response comes.

One possible implementation would be:

@Override
public Observable<Customer> asyncUserRetrieve(String customerId) {
    return Observable.fromCallable(() -> {
        final URI uri = getURL(customerId);
        final Response response = httpClient.callForResponse(uri);
        if (response.getHttpStatus().is2xxSuccessful()) {
            Customer customer = getResponse(response, customerId);
            return customer;
        }
        return new Customer().setError(true);

    } 
}

Update:

If you want your current thread to wait until all response comes, try doing something like this:

RxJava 1

List<Customer> tempCustomers = Observable.from(customerIds)
    .flatMap(customerId -> {
        asyncUserRetrieve(customerId)
            .subscribeOn(Schedulers.io())
    })
    .toList()
    .toBlocking() // Do this only if you want to block the thread.
    .single()

custDetails.addAll(tempCustomers);

RxJava 2

List<Customer> tempCustomers = Observable.fromIterable(customerIds)
    .flatMap(customerId -> {
        asyncUserRetrieve(customerId)
            .subscribeOn(Schedulers.io())
    })
    .toList()
    .blockingGet()

custDetails.addAll(tempCustomers);

Upvotes: 1

Related Questions