Reputation: 301
I am writing a function that creates multiple (7) CompletableFutures. Each of these futures basically does two things :
When all the 7 futures have finished the job, I want to continue with further code execution. So, I am using allOf() and then calling a join() on the Void CompletableFuture returned by allOf().
The problem is, even after all futures have executed (I can see the CSVs getting generated), the join() call remains stuck and further code execution is blocked forever.
I have tried the following things :
Waiting on each future one by one calling a join() after each future. This works but, at the cost of concurrency. I don't want to do this.
Tried using get() with a TIMEOUT instead of join(). But, this always ends up throwing an exception (as get always times out) which is undesirable.
Saw this JDK bug : https://bugs.openjdk.java.net/browse/JDK-8200347 . Not sure if this is a similar issue.
Tried running without a join() or get() which will not hold the thread execution and again is not desirable.
The main function which creates all futures.
public CustomResponse process() {
CustomResponse msgResponse = new CustomResponse();
try {
// 1. DbCall 1
CompletableFuture<Void> f1 = dataHelper.fetchAndUploadCSV1();
// 2. DbCall 2
CompletableFuture<Void> f2 = dataHelper.fetchAndUploadCSV2();
// 3. DbCall 3
CompletableFuture<Void> f3 = dataHelper.fetchAndUploadCSV3();
// 4. DbCall 4
CompletableFuture<Void> f4 = dataHelper.fetchAndUploadCSV4();
// 5. DbCall 5
CompletableFuture<Void> f5 = dataHelper.fetchAndUploadCSV5();
// 6. DbCall 6
CompletableFuture<Void> f6 = dataHelper.fetchAndUploadCSV6();
// 7. DbCall 7
CompletableFuture<Void> f7 = dataHelper.fetchAndUploadCSV7();
CompletableFuture<Void>[] fAll = new CompletableFuture[] {f1, f2, f3, f4, f5, f6, f7};
CompletableFuture.allOf(fAll).join();
msgResponse.setProcessed(true);
msgResponse.setMessageStatus("message");
} catch (Exception e) {
msgResponse.setMessageStatus(ERROR);
msgResponse.setErrorMessage("error");
}
return msgResponse;
}
Each of the fetchAndUploadCSV() functions looks like this :
public CompletableFuture<Void> fetchAndUploadCSV1() {
return CompletableFuture.supplyAsync(() -> {
try {
return someService().getAllData1();
} catch (Exception e) {
throw new RuntimeException(e);
}
}).thenAccept(results -> {
try {
if (results.size() > 0) {
csvWriter.uploadAsCsv(results);
}
else {
log.info(" No data found..");
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
And this is what csvWriter.uploadAsCsv(results)
looks like -
public <T> void uploadAsCsv(List<T> objectList) throws Exception {
long objListSize = ((objectList==null) ? 0 : objectList.size());
log.info("Action=Start, objectListSize=" + objListSize);
ByteArrayInputStream inputStream = getCsvAsInputStream(objectList);
Info fileInfo = someClient.uploadFile(inputStream);
log.info("Action=Done, FileInfo=" + ((fileInfo==null ? null : fileInfo.getID())));
}
I am using OpenCSV here to convert the data to CSV stream. And I can always see the last log line.
Expected Results : All data fetched, CSVs generated and CustomResponse should return as processed with no error message.
Actual Results : All data fetched, CSVs generated and main thread hung.
Upvotes: 4
Views: 8871
Reputation: 4365
You can use join
on each created CompletableFuture
without sacrificing concurrency:
public CustomResponse process() {
CustomResponse msgResponse = new CustomResponse();
List<CompletableFuture<Void>> futures = Arrays.asList(dataHelper.fetchAndUploadCSV1(),
dataHelper.fetchAndUploadCSV2(),
dataHelper.fetchAndUploadCSV3(),
dataHelper.fetchAndUploadCSV4(),
dataHelper.fetchAndUploadCSV5(),
dataHelper.fetchAndUploadCSV6(),
dataHelper.fetchAndUploadCSV7());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> {
msgResponse.setProcessed(true);
msgResponse.setMessageStatus("message");
return msgResponse;
})
.exceptionally(throwable -> {
msgResponse.setMessageStatus("ERROR");
msgResponse.setErrorMessage("error");
return msgResponse;
}).join();
}
allOf
returns a new CompletableFuture
that is completed when all of the given CompletableFutures complete. So, when join
is invoked in thenApply
, it returns immediately. In essence, joining is happening to already completed futures. This way blocking is eliminated. Also, to handle possible exceptions, exceptionally
should be invoked.
Upvotes: 2