Reputation: 36654
I have a test code with parallelStream()
that sends requests to a server machine.
Report report =
requestsList.parallelStream()
.map(request -> freshResultsGenerator.getResponse(request, e2EResultLongBL))
.map(response -> resultsComparer.compareToBl(response, e2EResultLongBL,
astarHistogramsArrayBl, latencyHistogramBl))
.reduce(null,
(sumReport, compare2) ->
{
if (sumReport == null) {
sumReport = new Report();
}
sumReport.add(compare2);
return sumReport;
},
(report1, report2) ->
{
Report report3 = new Report();
report3.add(report1);
report3.add(report2);
return report3;
});
The loads are too much for this machine and very quickly it returns HTTP 404 errors.
There are two things I didn't find an answer on Google:
Upvotes: 1
Views: 4264
Reputation: 100209
You may launch your pipeline inside the custom ForkJoinPool
and it will be used to get the result:
ForkJoinPool fjp = new ForkJoinPool(2);
System.out.println("My pool: " + fjp);
String result = CompletableFuture.supplyAsync(
() -> Stream.of("a", "b", "c").parallel()
.peek(x -> System.out.println(
((ForkJoinWorkerThread) Thread.currentThread()).getPool()))
.collect(Collectors.joining()), fjp).join();
System.out.println(result);
My StreamEx library adds a syntactic sugar method .parallel(fjp)
to make this simpler:
String result = StreamEx.of("a", "b", "c")
.parallel(fjp)
.peek(x -> System.out.println(
((ForkJoinWorkerThread) Thread.currentThread()).getPool()))
.collect(Collectors.joining());
Upvotes: 1
Reputation: 23329
ParallelStream
uses ForkJoinPool.commonPool()
which is initialized to your number of cores - 1.
It it possible to pass your own ForkJoin Executor
to parallelStream as described here, but the executor has to be a ForkJoin, which is not the best for IO bound tasks.
Upvotes: 2
Reputation: 137084
The Stream API uses ForkJoinPool
to execute concurrent tasks. Quoting its documentation:
The common pool is by default constructed with default parameters, but these may be controlled by setting three system properties:
java.util.concurrent.ForkJoinPool.common.parallelism
- the parallelism level, a non-negative integer
...
Also
a
ForkJoinPool
may be constructed with a given target parallelism level; by default, equal to the number of available processors.
So to customize the number of threads, you can set the system property java.util.concurrent.ForkJoinPool.common.parallelism
to the value you want:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4")
to set the number of worker threads to 4. By default, the number of threads will be equal to the number of processors you have.
Upvotes: 2