Reputation: 2447
I'm having an issue with stream().forEach
it doesn't finish in time before the method returns, here's how :
My entity :
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Foo {
private String a;
private int b;
private int c;
private String d;
private String e;
}
I have a method that calls an external service that gets a list of Foo
and then for each member of that list it calls two other external services to fill d
and e
fields :
public List<Foo> getOrdresListe() {
Foo[] fooArray = externalServiceOne.getFooList();
Arrays.stream(fooArray).forEach((f) -> {
CompletableFuture.supplyAsync(AsyncUtils.supplierWithCustomDependencies(() -> {
Dob dob = externalServiceTwo.getDeeEntity(f.getA());
f.setD(dob.getD());
Efo efo = externalServiceThree.getEeeEntity(f.getA());
f.setE(efo.getE());
return f;
}));
});
List<Foo> fooList = Arrays.asList(fooArray);
return fooList; // when this statement is reached d and e fields are null.
}
Because of some performance issues (and some best practices) I'm calling externalServiceTwo.getDeeEntity
and externalServiceThree.getEeeEntity
asynchronously with a custom supplier to embark some dependencies when invoking the services. But the main issue is that when returning fooList
d
and e
fields are null.
My question is how to wait for all async executions fo finish before returning fooList
?
Upvotes: 0
Views: 1474
Reputation: 510
May be try something like this: Send the request Async and create the List of Completable Futures of type Foo
and then using join
as a terminal operation as @boobalan suggested; you get the results in this way you can wait for all async executions fo finish before returning fooList.
List<foo> fooData = externalServiceOne.getFooList();
//Sends the request asynchronously and returns the Completable Future of type Foo which can be later used to get the results using `get` or `join`
private List<CompletableFuture<Foo>> sendRequest() {
return fooData.stream().map(this::computeAsync).collect(Collectors.toList());
}
private CompletableFuture<Foo> computeAsync(Foo f) {
return CompletableFuture.supplyAsync(() ->
CompletableFuture.supplyAsync(AsyncUtils.supplierWithCustomDependencies(() -> {
Dob dob = externalServiceTwo.getDeeEntity(f.getA());
f.setD(dob.getD());
Efo efo = externalServiceThree.getEeeEntity(f.getA());
f.setE(efo.getE());
return f
});
private List<Foo> processResponse() {
List<CompletableFuture<Foo>> futureResult = sendRequest();
List<Foo> fooList = futureResult.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList());
return fooList;
}
}
Upvotes: 1
Reputation: 4723
You could just remember the async tasks and wait for their completion manually:
public List<Foo> getOrdresListe() {
Foo[] fooArray = externalServiceOne.getFooList();
final List<CompletableFuture<Foo>> futures = new ArrayList<>(); // remember each
Arrays.stream(fooArray).forEach((f) -> {
futures.add(CompletableFuture.supplyAsync(
AsyncUtils.supplierWithCustomDependencies(() -> {
Dob dob = externalServiceTwo.getDeeEntity(f.getA());
f.setD(dob.getD());
Efo efo = externalServiceThree.getEeeEntity(f.getA());
f.setE(efo.getE());
return f;
})));
});
for (CompletableFuture<Foo> future : futures) {
future.join(); // wait for each
}
List<Foo> fooList = Arrays.asList(fooArray);
return fooList;
}
Upvotes: 1
Reputation: 1199
Why don't you create all CompletableFuture
s as an intermediate operation and wait for all async executions to finish in the terminal operation.
Arrays.stream(fooArray).map((f) -> CompletableFuture.supplyAsync(AsyncUtils.supplierWithCustomDependencies(() -> {
Dob dob = externalServiceTwo.getDeeEntity(f.getA());
f.setD(dob.getD());
Efo efo = externalServiceThree.getEeeEntity(f.getA());
f.setE(efo.getE());
return f;
})
)).forEach(fooCompletableFuture -> {
try {
fooCompletableFuture.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
Upvotes: 1