Christian Fries
Christian Fries

Reputation: 16942

What is the best / most elegant way to limit the number of concurrent evaluation (like with a fixedThreadPool) in parallel streams

Assume a lambda expression consume a certain amount of a resource (like memory) which is limited and requires to limit the number of concurrent executions (example: if the lambda temporarily consumes 100 MB (of local memory) and we like to limit it to 1GB, we do not allow for more that 10 concurrent evaluations).

What is the best way to limit the number of concurrent execution, say for example in

IntStream.range(0, numberOfJobs).parallel().foreach( i -> { /*...*/ });

?

Note: An obvious option is to perform a nesting like

    double jobsPerThread = (double)numberOfJobs / numberOfThreads;
    IntStream.range(0, numberOfThreads).parallel().forEach( threadIndex ->
        IntStream.range((int)(threadIndex * jobsPerThread), (int)((threadIndex+1) * jobsPerThread)).sequential().forEach( i -> { /*...*/ }));

Is this the only way? Tt is not that elegant. Actually I would like to have a

IntStream.range(0, numberOfJobs).parallel(numberOfThreads).foreach( i -> { /*...*/ });

Upvotes: 5

Views: 2986

Answers (2)

assylias
assylias

Reputation: 328795

Depending on your use case, using the CompletableFuture utility methods may be easier:

import static java.util.concurrent.CompletableFuture.runAsync;

ExecutorService executor = Executors.newFixedThreadPool(10); //max 10 threads
for (int i = 0; i < numberOfJobs; i++) {
    runAsync(() -> /* do something with i */, executor);
}

//or with a stream:
IntStream.range(0, numberOfJobs)
         .forEach(i -> runAsync(() -> /* do something with i */, executor));

The main difference with your code is that the parallel forEach will only return after the last job is over, whereas runAsync will return as soon as all the jobs have been submitted. There are various ways to change that behaviour if required.

Upvotes: 2

Holger
Holger

Reputation: 298499

The Streams use a ForkJoinPool for parallel operations. By default they are using the ForkJoinPool.commonPool() which does not allow changing the concurrency afterwards. However, you can use your own ForkJoinPool instance. When you execute the stream code within the context of your own ForkJoinPool this context pool will be used for the stream operations. The following example illustrates this by executing the same operation once using default behavior and once using a custom pool with a fixed concurrency of 2:

import java.util.HashSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;

public class InterfaceStaticMethod {
    public static void main(String[] arg) throws Exception {
      Runnable parallelCode=() -> {
        HashSet<String> allThreads=new HashSet<>();
        IntStream.range(0, 1_000_000).parallel().filter(i->{
          allThreads.add(Thread.currentThread().getName()); return false;}
        ).min();
        System.out.println("executed by "+allThreads);
      };
      System.out.println("default behavior: ");
      parallelCode.run();
      System.out.println("specialized pool:");
      ForkJoinPool pool=new ForkJoinPool(2);
      pool.submit(parallelCode).get();
    }
}

Upvotes: 4

Related Questions