Elad Benda
Elad Benda

Reputation: 36654

How to reduce # of threads in Java parallelStream?

I have a test code with parallelStream() that sends requests to a server machine.

Report report = 
    requestsList.parallelStream()
                .map(request -> freshResultsGenerator.getResponse(request, e2EResultLongBL))
                .map(response -> resultsComparer.compareToBl(response, e2EResultLongBL,
                            astarHistogramsArrayBl, latencyHistogramBl))
                .reduce(null,
                        (sumReport, compare2) ->
                        {
                            if (sumReport == null) {
                                sumReport = new Report();
                            }
                            sumReport.add(compare2);
                            return sumReport;
                        },
                        (report1, report2) ->
                        {
                            Report report3 = new Report();
                            report3.add(report1);
                            report3.add(report2);
                            return report3;
                        });

The loads are too much for this machine and very quickly it returns HTTP 404 errors.

There are two things I didn't find an answer on Google:

  1. What is the default # of threads for parallelStream, if not customed set?
  2. How can I set the number of worker threads to, say, 4?

Upvotes: 1

Views: 4264

Answers (3)

Tagir Valeev
Tagir Valeev

Reputation: 100209

You may launch your pipeline inside the custom ForkJoinPool and it will be used to get the result:

ForkJoinPool fjp = new ForkJoinPool(2);
System.out.println("My pool: " + fjp);
String result = CompletableFuture.supplyAsync(
    () -> Stream.of("a", "b", "c").parallel()
    .peek(x -> System.out.println(
        ((ForkJoinWorkerThread) Thread.currentThread()).getPool()))
    .collect(Collectors.joining()), fjp).join();
System.out.println(result);

My StreamEx library adds a syntactic sugar method .parallel(fjp) to make this simpler:

String result = StreamEx.of("a", "b", "c")
    .parallel(fjp)
    .peek(x -> System.out.println(
        ((ForkJoinWorkerThread) Thread.currentThread()).getPool()))
    .collect(Collectors.joining());

Upvotes: 1

Sleiman Jneidi
Sleiman Jneidi

Reputation: 23329

ParallelStream uses ForkJoinPool.commonPool() which is initialized to your number of cores - 1.

It it possible to pass your own ForkJoin Executor to parallelStream as described here, but the executor has to be a ForkJoin, which is not the best for IO bound tasks.

Upvotes: 2

Tunaki
Tunaki

Reputation: 137084

The Stream API uses ForkJoinPool to execute concurrent tasks. Quoting its documentation:

The common pool is by default constructed with default parameters, but these may be controlled by setting three system properties:

  • java.util.concurrent.ForkJoinPool.common.parallelism - the parallelism level, a non-negative integer
    ...

Also

a ForkJoinPool may be constructed with a given target parallelism level; by default, equal to the number of available processors.

So to customize the number of threads, you can set the system property java.util.concurrent.ForkJoinPool.common.parallelism to the value you want:

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4")

to set the number of worker threads to 4. By default, the number of threads will be equal to the number of processors you have.

Upvotes: 2

Related Questions