Reputation: 434
I am performing a single expensive operation in my stream manipulation which I would like to multithread, but remaining operations should be single threaded. For example:
package test;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.IntStream;
public class TestStreams {
private static Set<String> expensiveThreads = ConcurrentHashMap.newKeySet();
private static Set<String> cheapThreads = ConcurrentHashMap.newKeySet();
public static void main(String[] args) {
IntStream.range(1, 1000).parallel().map(i -> myExpensiveMap(i))
.unparallel() //does not compile
.forEach(i -> myCheapOperation(i));
System.out.println("Expensive Threads:" + expensiveThreads);
System.out.println("Cheap Threads: " + cheapThreads);
}
private static void myCheapOperation(int i) {
cheapThreads.add(Thread.currentThread().getName());
}
private static int myExpensiveMap(int i) {
expensiveThreads.add(Thread.currentThread().getName());
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return i;
}
}
The current output is:
Expensive Threads:[ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-2, main, ForkJoinPool.commonPool-worker-3]
Cheap Threads: [ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-2, main, ForkJoinPool.commonPool-worker-3]
But the output I would like is:
Expensive Threads:[ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-2, main, ForkJoinPool.commonPool-worker-3]
Cheap Threads: [main]
I have tried wrapping the original stream using StreamSupport(Spliterator, false), but this limits the original stream to single thread processing. For example:
StreamSupport.stream(
IntStream.range(1, 1000).parallel().map(i -> myExpensiveMap(i))
.spliterator(), false)
.forEach(i -> myCheapOperation(i));
Expensive Threads:[main]
Cheap Threads: [main]
Or using parallel = true, for example:
StreamSupport.stream(
IntStream.range(1, 1000).parallel().map(i -> myExpensiveMap(i))
.spliterator(), true)
.forEach(i -> myCheapOperation(i));
Expensive Threads:[ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-2, main, ForkJoinPool.commonPool-worker-3]
Cheap Threads: [ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-2, main, ForkJoinPool.commonPool-worker-3]
How can I unparallel this Stream?
Upvotes: 3
Views: 425
Reputation: 17329
The opposite of .parallel
is .sequential
:
public static void main(String[] args) {
IntStream.range(1, 1000).parallel().map(i -> myExpensiveMap(i))
.sequential()
.forEach(i -> myCheapOperation(i));
System.out.println("Expensive Threads:" + expensiveThreads);
System.out.println("Cheap Threads: " + cheapThreads);
}
Upvotes: 1