varman
varman

Reputation: 8894

How to make threads in ExecutorService to wait stage

In my web application, I need to call around more than 10 methods in one API call. To make that efficient I use ExecutorService to create multiple threads at a same time. Each methods returning different Objects expect fav_a(), fav_b(), fav_c(). (Sample code is given below for easiness)

@GetMapping(RequestUrl.INIT)
public ResponseEntity<Map<String, List<?>>> init() throws ExecutionException, InterruptedException {

    ExecutorService service = Executors.newFixedThreadPool(6);

    Future<List<Object>> method_a = service.submit(() -> someService.method_a());
    Future<List<Object>> method_b = service.submit(() -> someService.method_b());
    Future<List<Object>> method_c = service.submit(() -> someService.method_c());       
    
    Future<List<FavouriteConverter>> fav_a = service.submit(() -> someService.fav_a());
    Future<List<FavouriteConverter>> fav_b = service.submit(() -> someService.fav_b());
    Future<List<FavouriteConverter>> fav_c = service.submit(() -> someService.fav_c());

    service.shutdown();

    List<FavouriteConverter> combinedFavourite = Stream.of(fav_a.get(), fav_b.get(), fav_c.get()).flatMap(f -> f.stream()).collect(Collectors.toList());
    combinedFavourite=combinedFavourite.stream()
            .sorted(Comparator.comparing(FavouriteConverter::get_id, Comparator.reverseOrder()))
            .limit(25)
            .collect(Collectors.toList());

    Map<String, List<?>> map = new HashMap<>();    

    map.put("method_a", method_a.get());
    map.put("method_b", method_b.get());
    map.put("method_c", method_c.get());
    map.put("favourite", combinedFavourite);

    return new ResponseEntity<>(map, HttpStatus.OK);

}

First I need to get fav_a.get(), fav_b.get(), fav_c.get() to make combinedFavourite. If any of one delays, the logic will be wrong. Creating threads are expensive.

  1. Does Stream automatically handle this kind of situation?
  2. If fav_a(), fav_b(), fav_c() do it jobs earlier than other methods, How can I put combinedFavourite into another thread? This means how to make Future<List<FavouriteConverter>> combinedFavourite in waiting stage until fav_a.get(), fav_b.get(), fav_c.get() finishes. (Assume method_a(),method_b(),method_c() still running.)

Upvotes: 1

Views: 187

Answers (1)

Nikolas
Nikolas

Reputation: 44496

  1. No, Streams are not responsible for joining these threads.

  2. Since you wait for the results of these 3 threads and putting them into a map which you return, wrapping such logic in a separate thread doesn't help you as long as you have to wait and return the result.

    Use ExecutorService::invokeAll to execute all the tasks and returning a list of Futures when all are complete (when Future::done is true).

    List<Future<List<Object>>> list = service.invokeAll(
        Arrays.asList(
            () -> someService.method_a(),
            () -> someService.method_b(),
            () -> someService.method_c()
    ));
    

    Note these are guaranteed:

    • The result List<Future> is in the same order as the collection of tasks given (according to its Iterator).
    • All the tasks will run in a separate thread if the pooled number of threads are higher or equal than executed tasks (assuming there are no other tasks using a thread from the same thread pool).

This logics helps you to work with complete results.

Upvotes: 2

Related Questions