Aaron
Aaron

Reputation: 434

How can I use parallel execution on a single step of a Java Stream

I am performing a single expensive operation in my stream manipulation which I would like to multithread, but remaining operations should be single threaded. For example:

package test;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.IntStream;

public class TestStreams {
    private static Set<String> expensiveThreads = ConcurrentHashMap.newKeySet();
    private static Set<String> cheapThreads = ConcurrentHashMap.newKeySet();

    public static void main(String[] args) {
        IntStream.range(1, 1000).parallel().map(i -> myExpensiveMap(i))
                .unparallel()  //does not compile
                .forEach(i -> myCheapOperation(i));
        System.out.println("Expensive Threads:" + expensiveThreads);
        System.out.println("Cheap Threads:    " + cheapThreads);
    }

    private static void myCheapOperation(int i) {
        cheapThreads.add(Thread.currentThread().getName());
    }

    private static int myExpensiveMap(int i) {
        expensiveThreads.add(Thread.currentThread().getName());
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }
}

The current output is:

Expensive Threads:[ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-2, main, ForkJoinPool.commonPool-worker-3]
Cheap Threads:    [ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-2, main, ForkJoinPool.commonPool-worker-3]

But the output I would like is:

Expensive Threads:[ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-2, main, ForkJoinPool.commonPool-worker-3]
Cheap Threads:    [main]

I have tried wrapping the original stream using StreamSupport(Spliterator, false), but this limits the original stream to single thread processing. For example:

StreamSupport.stream(
                IntStream.range(1, 1000).parallel().map(i -> myExpensiveMap(i))
                        .spliterator(), false)
                .forEach(i -> myCheapOperation(i));

Expensive Threads:[main]
Cheap Threads:    [main]

Or using parallel = true, for example:

StreamSupport.stream(
                IntStream.range(1, 1000).parallel().map(i -> myExpensiveMap(i))
                        .spliterator(), true)
                .forEach(i -> myCheapOperation(i));

Expensive Threads:[ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-2, main, ForkJoinPool.commonPool-worker-3]
Cheap Threads:    [ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-2, main, ForkJoinPool.commonPool-worker-3]

How can I unparallel this Stream?

Upvotes: 3

Views: 425

Answers (1)

Brian
Brian

Reputation: 17329

The opposite of .parallel is .sequential:

public static void main(String[] args) {
    IntStream.range(1, 1000).parallel().map(i -> myExpensiveMap(i))
            .sequential()
            .forEach(i -> myCheapOperation(i));
    System.out.println("Expensive Threads:" + expensiveThreads);
    System.out.println("Cheap Threads:    " + cheapThreads);
}

Upvotes: 1

Related Questions