KingTravisG
KingTravisG

Reputation: 1336

Execute multiple downloads and wait for all to complete

I am currently working on an API service that allows 1 or more users to download 1 or more items from an S3 bucket and return the contents to the user. While the downloading is fine, the time taken to download several files is pretty much 100-150 ms * the number of files.

I have tried a few approaches to speeding this up - parallelStream() instead of stream() (which, considering the amount of simultaneous downloads, is at a serious risk of running out of threads), as well as CompleteableFutures, and even creating an ExecutorService, doing the downloads then shutting down the pool. Typically I would only want a few concurrent tasks e.g. 5 at the same time, per request to try and cut down on the number of active threads.

I have tried integrating Spring @Cacheable to store the downloaded files to Redis (the files are readonly) - while this certainly cuts down response times (several ms to retrieve files compared to 100-150 ms), the benefits are only there once the file has been previously retrieved.

What is the best way to handle waiting on multiple async tasks to finish then getting the results, also considering I don't want (or don't think I could) have hundreds of threads opening http connections and downloading all at once?

Upvotes: 2

Views: 2594

Answers (2)

srborlongan
srborlongan

Reputation: 4579

Following @Hank D's lead, you can encapsulate the creation of the executor service to ensure that you do, indeed, call ExecutorService::shutdownNow after using said executor:

private static <VALUE> VALUE execute(
  final int nThreads,
  final Function<ExecutorService, VALUE> function
) {
  ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
  try {
    return function.apply(executorService);
  } catch (final InterruptedException | ExecutionException exception) {
    exception.printStackTrace();
  } finally {
    executorService .shutdownNow(); // important to call this when you're done with the executor service.
  }
}

public static void main(final String... arguments) {
  // define variables
  final List<CompletableFuture<Path>> downloadTasks = execute(
    MAX_THREADS_FOR_DOWNLOADS,
    executor -> s3Paths
      .stream()
      .map(s3Path -> completableFuture.supplyAsync(
        () -> mys3Downloader.downloadAndGetPath(s3Path),
        executor
      ))
      .collect(Collectors.toList())
  );
  // use downloadTasks
}

Upvotes: 1

Hank D
Hank D

Reputation: 6491

You're right to be concerned about tying up the common fork/join pool used by default in parallel streams, since I believe it is used for other things like sort operations outside of the Stream api. Rather than saturating the common fork/join pool with an I/O-bound parallel stream, you can create your own fork/join pool for the Stream. See this question to find out how to create an ad hoc ForkJoinPool with the size you want and run a parallel stream in it.

You could also create an ExecutorService with a fixed-size thread pool that would also be independent of the common fork/join pool, and would throttle the requests, using only the threads in the pool. It also lets you specify the number of threads to dedicate:

ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS_FOR_DOWNLOADS);
try {
    List<CompletableFuture<Path>> downloadTasks = s3Paths
            .stream()
            .map(s3Path -> completableFuture.supplyAsync(() -> mys3Downloader.downloadAndGetPath(s3Path), executor))
            .collect(Collectors.toList());    

        // at this point, all requests are enqueued, and threads will be assigned as they become available      

        executor.shutdown();    // stops accepting requests, does not interrupt threads, 
                                // items in queue will still get threads when available

        // wait for all downloads to complete
        CompletableFuture.allOf(downloadTasks.toArray(new CompletableFuture[downloadTasks.size()])).join();

        // at this point, all downloads are finished, 
        // so it's safe to shut down executor completely

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    } finally {
        executor.shutdownNow(); // important to call this when you're done with the executor.
    }

Upvotes: 3

Related Questions