Dhana D.
Dhana D.

Reputation: 1720

How to do multiple API calls concurrently in Spring service without changing main?

I need to make a service in an existing fat code to get results from 4 APIs and I need to merge them and reformat each responses, but it takes very slow due to 4 calls that I don't know how to do it concurrently. I am also unable to change the main to add Runnable or such executor in the main as it may have snowballing effect to another code.

So currently, I have made a controller which handle the request, a service which get the request from user and call 5 different service-middleware (SM) functions. Every SM functions used to call an external API, and in every SM, I reformat each return map of the APIs there as well. I use java.net.HttpURLConnection to do the API calls. Thus, I got my API "worked" but can't be faster than 4 seconds. Those APIs needs additional OAuth, so it would be roughly 10 API calls in total.

Since the current returns of API calls are Object type, I can treat it as Map, and reformat the output by doing looping for the data inside it. So the SM function would likely have the code similarly to below:

token = sendHttpRequest(authUrl, authRequestHeader, null, null, "GET");
Map response = sendHttpRequest(url, requestHeader, bodyParam, null, "POST");
List<Map> data = (List) ((Map) response.get("output")).get("data");
List<Map> result = new HashMap();
for(Map m : data) {
  Map temp = new HashMap();
  temp.put("name", m.get("Name"));
  temp.put("health_status", m.get("HealthStatus"));
  result.add(temp);
}

// This format is mandatory
Map finalResult = new HashMap();
finalResult.put("output", result);
finalResult.put("status", "OK");
return finalResult;

And the sendHttpRequest is the method to send request, serializing params to JSON and deserializing API output to be an Object. Here's the sendHttpRequest look like:

CloseableHttpClient httpClient = HttpClients.custom()
                        .setSSLSocketFactory(csf)
                        .build();

HttpComponentsClientHttpRequestFactory requestFactory =
                        new HttpComponentsClientHttpRequestFactory();
requestFactory.setConnectTimeout(this.connectTimeOut);
requestFactory.setReadTimeout(this.readTimeOut);
requestFactory.setHttpClient(httpClient);

RestTemplate rt = new RestTemplate(requestFactory);
HttpEntity<Map> request = null;
if(method.equals("POST"))
    request = new HttpEntity<Map>(objBody, headers);
else if(method.equals("GET"))
    request = new HttpEntity<Map>(headers);
    
try {
    ResponseEntity<Map> response = null;
    if(method.equals("POST"))
        restTemplate.postForEntity(url, request , Map.class);
    if(method.equals("GET"))
        restTemplate.postForEntity(url, request , Map.class);
    if(this.outputStream){
        logger.debug("Output : " + response.getBody());
    }
    return response.getBody();
} catch(HttpClientErrorException e) {
    logger.debug(e.getMessage());
}

The sendHttpRequest method is also an existing method that I am disallowed to change except if I just make a new method for doing my requests only.

Simply say, here's the things I need to do:

  1. For each of the API calls:

    • Get the Authorization token from an external API.
    • Do the request (POST/GET) to another external API to get data.
    • Reformat the data to be expected format for response (each has its own format) <Mostly loop the array of the response object to remap the field names as it's necessary>.
  2. After all APIs finished calling, I need to do:

    • Merge output from API 1 and 3 to a Map/Object
    • Merge output from API 2 & 4 to an Array and sort them all
    • Put response from API 5 in the inner object of a defined attribute/field.
Things I had tried

I had tried the use of ExecutorCompletionService to call the 5 SMs. I also created an inner class that implements Callable for this.

ExecutorService executor = Executors.newFixedThreadPool(5);
CompletionService completionService = new ExecutorCompletionService<>(executor);

List<Future<Map>> results = new ArrayList<>();
for(int i=1; i<6; i++) {
    // i here is used to define which api calls to be done
    results.add(completionService.submit(new CallAPIClass(paramMap, i)));
}

for (int i=0; i < results.size(); i++) {
    try {
        Map result = (Map) completionService.take().get();
        int code = (int) result.get("code");
        
        // Collect the results for each SM (SM function has described above)

    } catch (Exception e) {
        logger.debug(e.getMessage());
    }
}

// Merge the outputs.

In the Merge the outputs, I need to construct the map, so it will be like this:

{
  "details": {"api1": {...}, "api3": {...}},
  "list_items": [{...}, {...}, ...], // Results of sorted merged lists from api2 & api4
  "api5": [{...}, {...}, {...}, ...]
}

Meanwhile, from the api responses, basically I just retrieve all of their output_schema when exists.


Any tips to optimize and speed up this API call, so by the same number of calls, this can be executed faster??? Any help is greatly appreciated.

Edit


I have read @Ananthapadmanabhan's answer, but it seems that I need to change the main class file which I can't do. Or is it actually possible to apply the use of CompletableFuture without using @EnableAsync in main class? I also wonder how to get this done in a faster time even with CompletableFuture and EnableAsync with this chain of processes.

Upvotes: 3

Views: 31077

Answers (2)

jccampanero
jccampanero

Reputation: 53461

The solution you tried looks quite decent to me:

ExecutorService executor = Executors.newFixedThreadPool(5);
CompletionService completionService = new ExecutorCompletionService<>(executor);

List<Future<Map>> results = new ArrayList<>();
for(int i=1; i<6; i++) {
    // i here is used to define which api calls to be done
    results.add(completionService.submit(new CallAPIClass(paramMap, i)));
}

for (int i=0; i < results.size(); i++) {
    try {
        Map result = (Map) completionService.take().get();
        int code = (int) result.get("code");
        
        // Collect the results for each SM (SM function has described above)

    } catch (Exception e) {
        logger.debug(e.getMessage());
    }
}

// Merge the outputs.

I am not quite sure if, in addition to a probably more fluent API, using CompletableFuture will give you any benefit related to the performance of the program - the subject has been broadly discussed here in SO, see for example 1 2 3 - but here it is a possible solution.

In fact, the next code is based in one of my previous answers, in turn closely related to this article from the Tomasz Nurkiewicz blog.

The CompletableFuture counterpart for the code you provided will look like:

ExecutorService executor = Executors.newFixedThreadPool(5);

// List of the different parameters to perform every external API invocations
final List<Map> smParameters = Arrays.asList(
  ...
);


// Submit invoke external task to the thread pool 
final List<CompletableFuture<Map>> futures = smParameters.stream().
  map(paramMap -> CompletableFuture.supplyAsync(() -> invokeExternalAPI(paramMap), executor)).
  collect(Collectors.<CompletableFuture<Map>>toList())
;

// The next code is based on the sequence method proposed in the blog I cited
// The idea is to turn the `List<CompletableFuture<Map>>` we have into a
// CompletableFuture<List<Map>> with the results of every single async task
final CompletableFuture<Void> allDoneFuture =
  CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));

final CompletableFuture<List<Map>> allDone = allDoneFuture.thenApply(v ->
  futures.stream().
    map(future -> future.join()).
    collect(Collectors.<Map>toList())
);

// Merge the outputs.
final Map result = allDone.thenAccept(results ->
  // Merge the outputs. The results variable contains the different Mapz
  // obtained from the every different API invocation
);

Please, verify the above code, probably it will require the definition of the type of the different parameters of your Map objects.

The mentioned invokeExternalAPI could accept a Map with the different parameters required to perform the individual APIs invocations, something like:

private Map invokeExternalAPI(Map configuration) {
  // Pass and extract from the configuration the authUrl, etcetera, everything you need to

  // Your code...

  token = sendHttpRequest(authUrl, authRequestHeader, null, null, "GET");
  Map response = sendHttpRequest(url, requestHeader, bodyParam, null, "POST");
  List<Map> data = (List) ((Map) response.get("output")).get("data");
  List<Map> result = new HashMap();
  for(Map m : data) {
    Map temp = new HashMap();
    temp.put("name", m.get("Name"));
    temp.put("health_status", m.get("HealthStatus"));
    result.add(temp);
  }

  // This format is mandatory
  Map finalResult = new HashMap();
  finalResult.put("output", result);
  finalResult.put("status", "OK");
  return finalResult;
}

I think you don't need to modify your main class nor any configuration, as the solution is pure Java based.

Please, bear in mind that this generic approach can be customized to accommodate different requirements.

For example, according to your comments, it seems that you need to invoke from your service the functionality implemented in your different services middleware.

In order to define the list of tasks you wanna perform concurrently you could try something like the following instead of my initial suggestion:

List<CompletableFuture<Map>> futures = new ArrayList<>(5);

// Obtain a reference to the second middleware, and submit it
final ServiceMiddleware1 sm1 = new ServiceMiddleware1();
final CompletableFuture<Map> sm1Cf = CompletableFuture.supplyAsync(() -> sm1.doYourStuff(), executor);
futures.add(sm1Cf);

// Now obtain a reference to the second middleware, and submit it again
final ServiceMiddleware2 sm2 = new ServiceMiddleware2();
final CompletableFuture<Map> sm2Cf = CompletableFuture.supplyAsync(() -> sm2.doYourStuff(), executor);
futures.add(sm2Cf);

// the rest of service middleware. I think here a common interface
// or some kind of inheritance could be of help in the invocation

// At the end, you will get the list of futures you wanna execute in parallel

// The rest of the code is the same
final CompletableFuture<Void> allDoneFuture =
  CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));

final CompletableFuture<List<Map>> allDone = allDoneFuture.thenApply(v ->
  futures.stream().
    map(future -> future.join()).
    collect(Collectors.<Map>toList())
);

// Merge the outputs.
final Map result = allDone.thenAccept(results ->
  // Merge the outputs. The results variable contains the different Mapz
  // obtained from the every different API invocation
);

To deal with errors you have several options.

One obvious is to handle the error in the service middleware itself, in such a way that it will never raise any exceptions, but return some kind of information in its result Map like result code, status, etcetera.

CompletableFuture itself gives you different options as well to deal with errors. As you probably need to perform some changes in the result Map you can, when necessary, use the handle method. It basically takes as argument the result and the hypothetical exception obtained in the execution of the task associated with a CompletableFuture, and return a new one CompletableFuture with the appropriate customizations, based on that result and possible error. For example, in your 4th and 5th service middlewares, which seems to raise errors, you can use something like:

final ServiceMiddleware4 sm4 = new ServiceMiddleware4();
final CompletableFuture<Map> sm4Cf = CompletableFuture.supplyAsync(() -> sm4.doYourStuff(), executor)
  .handle((result, exception) -> {
      if (exception == null) {
        return result;
      }

      Map actualResult = new HashMap();
      actualResult.put("errorCode", "xxx")
      actualResult.put("errorMessage", exception.getMessage());
      return actualResult; 
    });
  )
;
futures.add(sm4Cf);

This great article, for instance, explains in detail further error handling approaches.

All these approaches assume that your code doesn't throw checked exceptions. If you need to deal with them, as it seems according to your comment, you could use a modified version of the code that Holger posted in this SO answer. The idea is to create a method that will handle the check exception, completing it with the appropriate error if necessary:

public static <T> CompletableFuture<T> supplyAsync(Supplier supplier, Executor executor) {
    CompletableFuture<T> f=new CompletableFuture<>();
    CompletableFuture.runAsync(() -> {
        try { f.complete(supplier.get()); } catch(Throwable t) { f.completeExceptionally(t); }
    }, executor);
    return f;
}

Then, use this method to submit every service middleware task:

List<CompletableFuture<Map>> futures = new ArrayList<>(5);

// Obtain a reference to the second middleware, and submit it
final ServiceMiddleware1 sm1 = new ServiceMiddleware1();
final CompletableFuture<Map> sm1Cf = supplyAsync(() -> sm1.doYourStuff(), executor)
  // this method will only be executed if any exception is thrown
  .exceptionally(exception -> { 
    Map errorResult = new HashMap();
    errorResult.put("errorCode", "xxx")
    errorResult.put("errorMessage", exception.getMessage());
    return errorResult; 
  });
futures.add(sm1Cf);

// Apply a similar logic to the rest of services middlewares...

// The rest of the code is the same as above
final CompletableFuture<Void> allDoneFuture =
  CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));

final CompletableFuture<List<Map>> allDone = allDoneFuture.thenApply(v ->
  futures.stream().
    map(future -> future.join()).
    collect(Collectors.<Map>toList())
);

// Merge the outputs.
// Please, be aware that in the lambda expression results
// is a List of the different Maps obtained as the individual
// results of every single service middleware call
// I would create an object that agglutinates these results in
// the right format, as you indicated in your question. Let's call
// this container class ServiceMiddlewareResult. Then, the merge
// results code will looks like similar to this
final ServiceMiddlewareResult result = allDone.thenAccept(results -> {
  ServiceMiddlewareResult serviceMiddlewareResult = new ServiceMiddlewareResult();
  // Variable used for storing temporarily the Api 2 and 4 results
  // Parameterize it as necessary
  List tempResultsFromApi2AndApi4 = new ArrayList();
  // Honestly I don't remember if the order of the results is the
  // same as the order of the futures that generated them, my guess
  // is that not, as it depends on the actual future completion,
  // but in any way I always try thinking that the results can be 
  // in any order, so it is important that every Map contains the 
  // minimal information to identify the corresponding service 
  // middleware. With that assumption in mind, your code will look
  // similar to this:
  results.forEach(result -> {
    // The suggested idea, identify the service middleware that
    // produced the results
    String serviceMiddleware = result.get("serviceMiddleware");
    switch(serviceMiddleware) {
      // handle every case appropriately
      case 'sm1': {
        // it should be similar to sm3
        serviceMiddlewareResult.getDetails().setApi1(...);
        break;
      }

      case 'sm2':
      case 'sm4': {
        // Extract results from the Map, and add to the temporary list
        tempResultsFromApi2AndApi4.add(...)
        break;
      }

      case 'sm5': {
        // extract results and populate corresponding object
        serviceMiddlewareResult.setApi5(...);
        break;
      }
    }
  });

  List sortedResultsFromApi2AndApi4 = Collections.sort(
    sortedResultsFromApi2AndApi4, ... the appropriate comparator...
  );
  result.setListItems(sortedResultsFromApi2AndApi4);

  return result;  
});

I modified the example to provide a posible approach to merge your results.

Please, consider include logging information within your service middleware code if you need to trace and improve the debugging capabilities offered by the overall solution.

If you have used them before, as an alternative, you could try solutions based in libraries like RxJava or Project Reactor as well.

Upvotes: 3

Ananthapadmanabhan
Ananthapadmanabhan

Reputation: 6226

If all the 4 api calls are independent of each other and you are using java 8 , you could extract them to separate functions in a separate service layer if needed and use spring @Async annotation on the method along with CompletableFuture as return type to make parallel calls.

@Service
public class TestClient {
    RestTemplate restTemplate = new RestTemplate();

    @Async
    public CompletableFuture<List<TestPojo>> getTestPojoByLanguage(String language) {
        String url = "https://test.eu/rest/v2/lang/" + language + "?fields=name";
        Country[] response = restTemplate.getForObject(url, Country[].class);

        return CompletableFuture.completedFuture(Arrays.asList(response));
    }

    @Async
    public CompletableFuture<List<TestPojo>> getCountriesByRegion(String region) {
        String url = "https://testurl.eu/rest/v2/region/" + region + "?fields=name";
        Country[] response = restTemplate.getForObject(url, Country[].class);

        return CompletableFuture.completedFuture(Arrays.asList(response));
    }
}

Completable Future guide.

Upvotes: 1

Related Questions