Reputation: 16942
Assume a lambda expression consume a certain amount of a resource (like memory) which is limited and requires to limit the number of concurrent executions (example: if the lambda temporarily consumes 100 MB (of local memory) and we like to limit it to 1GB, we do not allow for more that 10 concurrent evaluations).
What is the best way to limit the number of concurrent execution, say for example in
IntStream.range(0, numberOfJobs).parallel().foreach( i -> { /*...*/ });
?
Note: An obvious option is to perform a nesting like
double jobsPerThread = (double)numberOfJobs / numberOfThreads;
IntStream.range(0, numberOfThreads).parallel().forEach( threadIndex ->
IntStream.range((int)(threadIndex * jobsPerThread), (int)((threadIndex+1) * jobsPerThread)).sequential().forEach( i -> { /*...*/ }));
Is this the only way? Tt is not that elegant. Actually I would like to have a
IntStream.range(0, numberOfJobs).parallel(numberOfThreads).foreach( i -> { /*...*/ });
Upvotes: 5
Views: 2986
Reputation: 328795
Depending on your use case, using the CompletableFuture
utility methods may be easier:
import static java.util.concurrent.CompletableFuture.runAsync;
ExecutorService executor = Executors.newFixedThreadPool(10); //max 10 threads
for (int i = 0; i < numberOfJobs; i++) {
runAsync(() -> /* do something with i */, executor);
}
//or with a stream:
IntStream.range(0, numberOfJobs)
.forEach(i -> runAsync(() -> /* do something with i */, executor));
The main difference with your code is that the parallel forEach
will only return after the last job is over, whereas runAsync
will return as soon as all the jobs have been submitted. There are various ways to change that behaviour if required.
Upvotes: 2
Reputation: 298499
The Stream
s use a ForkJoinPool
for parallel operations. By default they are using the ForkJoinPool.commonPool()
which does not allow changing the concurrency afterwards. However, you can use your own ForkJoinPool
instance. When you execute the stream code within the context of your own ForkJoinPool
this context pool will be used for the stream operations. The following example illustrates this by executing the same operation once using default behavior and once using a custom pool with a fixed concurrency of 2
:
import java.util.HashSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
public class InterfaceStaticMethod {
public static void main(String[] arg) throws Exception {
Runnable parallelCode=() -> {
HashSet<String> allThreads=new HashSet<>();
IntStream.range(0, 1_000_000).parallel().filter(i->{
allThreads.add(Thread.currentThread().getName()); return false;}
).min();
System.out.println("executed by "+allThreads);
};
System.out.println("default behavior: ");
parallelCode.run();
System.out.println("specialized pool:");
ForkJoinPool pool=new ForkJoinPool(2);
pool.submit(parallelCode).get();
}
}
Upvotes: 4