Reputation: 1
I've got a list of input elements which I want to queue into several ThreadPools. Let's say this is my input:
final List<Integer> ints = Stream.iterate(1, i -> i + 1).limit(100).collect(Collectors.toList());
These are the three functions I want the elements to run through each after another:
final Function<Integer, Integer> step1 =
value -> { // input from the ints list
return value * 2;
};
final Function<Integer, Double> step2 =
value -> { // input from the previous step1
return (double) (value * 2); //
};
final Function<Double, String> step3 =
value -> { // input from the previous step2
return "Result: " + value * 2;
};
And these would be the pools for each step:
final ExecutorService step1Pool = Executors.newFixedThreadPool(4);
final ExecutorService step2Pool = Executors.newFixedThreadPool(3);
final ExecutorService step3Pool = Executors.newFixedThreadPool(1);
I want each element to run through step1Pool
and apply the step1
. As soon as one element is done its result should
end up in step2pool
so that step2
can be applied here. As soon as something in step2Pool
is done it should be
queued in step3Pool
and step3
should be applied.
On my main thread I want to wait until I have all the results from step3
. The order in which each element is processed
doesn't matter. Only that they all run through step1
-> step2
-> step3
on the correct thread pool.
Basically I want to parallelize the Stream.map
, push each result immediately to the next queue and wait until I've
got ints.size()
results from my last thread pool back.
Is there a simple way to do achieve in Java?
Upvotes: 0
Views: 170
Reputation: 870
I believe that CompletableFuture will help you here!
List<CompletableFuture<String>> futures = ints.stream()
.map(i -> CompletableFuture.supplyAsync(() -> step1.apply(i), step1Pool)
.thenApplyAsync(step2, step2Pool)
.thenApplyAsync(step3, step3Pool))
.collect(Collectors.toList());
List<String> result = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
Upvotes: 4
Reputation: 2020
Better use streams for that :
List<String> stringList = Stream.iterate(1, i -> i + 1)
.limit(100)
.parallel()
.map(step1)
.map(step2)
.map(step3)
.collect(Collectors.toList());
Upvotes: 0