Reputation: 119
I have list contains large amount of data ( e,g 50 k approx). I need to perform some operation on each list item and then have to map the result from processing to java object to write into the csv. so far the sequential code for this is working fine for me.
list.stream().map(listItem->
service.methodA(listItem)
.map(result ->mapToBean(result, listItem))
).flatMap(Optional::stream).collect(Collectors.toList());
This operation is slow takes around 4 hr for 2k data processing. So I decided to speedup this by Aysnc Processing using completable future library.( I'm new to Completable future) Here is the code I written for parallel processing.
ExecutorService service = Executors.newFixedThreadPool(noOfCores-1);
Lists.partition(list, 500).stream()
.map(item-> CompletableFuture.supplyAsync(()-> executeListPart (item),service))
.map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList());
But when I run this code, I observed that thread 1 processing the one part of list then wait for it to complete and then Thread2 start processing another part and so on. Hence it seems to execute sequentially (of-course in different thread) hence here no gain for me. I want to execute the each partition in parallel and finally collect the result of each thread and combine at the last. Can someone let help me to achieve this.
Upvotes: 0
Views: 3512
Reputation: 11
ExecutorService service = Executors.newCachedThreadPool();
final List<CompletableFuture> completableFutureList = list.stream()
.map(item-> CompletableFuture.supplyAsync(()-> executeListPart (item),service))
.flatMap(List::stream)
.collect(Collectors.toList());
private static <T> CompletableFuture<List<T>> executeAll(List<CompletableFuture<T>> completableFutures) {
CompletableFuture<Void> completedFutures =
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture<?>[0]));
return completedFutures.thenApply(v ->
futures.stream()
.map(future -> future.join())
.toList
);
}
The trick is to use existing allOf(), but when completedFutures complete (which means all underlying futures are done), iterate over all futures, and join() (blocking wait) on each. However, this call is guaranteed not to be blocked because, by now, all futures are completed! In this way, you can benefit from asynchronous programming !!
Upvotes: 0
Reputation: 421
The reason why your tasks are being executed sequentially is because you do .map(CompletableFuture::join)
during the pipeline execution and what actually happens is that the execution will wait for the CompletableFuture
to join before moving with the next item in the forEach
. You would need to remove that particular map operation from your stream.
To wait for the all the task termination, there are a couple of ways to do it. One is to call shutdown()
on your ExecutorService and then awaitTermination(TIMEOUT)
. However this is not efficient if you are going to use that same ExecutorService again during your app lifecycle.
Another, would be to do something like the following:
ExecutorService service = Executors.newFixedThreadPool(noOfCores-1);
var completableFutureList = Lists.partition(list, 500).stream()
.map(item-> CompletableFuture.supplyAsync(()-> executeListPart (item),service)).flatMap(List::stream).collect(Collectors.toList());
...and then:
completableFutureList.forEach(CompletableFuture::join);
Happy hacking! =)
Upvotes: 2