Fourat
Fourat

Reputation: 2447

How to wait for enclosed asynchronous block to finish

I'm having an issue with stream().forEach it doesn't finish in time before the method returns, here's how :

My entity :

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Foo {
    private String a;
    private int b;
    private int c;
    private String d;
    private String e;
}

I have a method that calls an external service that gets a list of Foo and then for each member of that list it calls two other external services to fill d and e fields :

public List<Foo> getOrdresListe() {
    Foo[] fooArray = externalServiceOne.getFooList();
    Arrays.stream(fooArray).forEach((f) -> {
        CompletableFuture.supplyAsync(AsyncUtils.supplierWithCustomDependencies(() -> {
            Dob dob = externalServiceTwo.getDeeEntity(f.getA());
            f.setD(dob.getD());
            Efo efo = externalServiceThree.getEeeEntity(f.getA());
            f.setE(efo.getE());
            return f;
        }));
    });
    List<Foo> fooList = Arrays.asList(fooArray);
    return fooList; // when this statement is reached d and e fields are null.
}

Because of some performance issues (and some best practices) I'm calling externalServiceTwo.getDeeEntity and externalServiceThree.getEeeEntity asynchronously with a custom supplier to embark some dependencies when invoking the services. But the main issue is that when returning fooList d and e fields are null.

My question is how to wait for all async executions fo finish before returning fooList ?

Upvotes: 0

Views: 1474

Answers (3)

ajain
ajain

Reputation: 510

May be try something like this: Send the request Async and create the List of Completable Futures of type Foo and then using join as a terminal operation as @boobalan suggested; you get the results in this way you can wait for all async executions fo finish before returning fooList.


List<foo> fooData = externalServiceOne.getFooList();

//Sends the request asynchronously and returns the Completable Future of type Foo which can be later used to get the results using `get` or `join`
private List<CompletableFuture<Foo>> sendRequest() {
    return fooData.stream().map(this::computeAsync).collect(Collectors.toList());
}

private CompletableFuture<Foo> computeAsync(Foo f) {
    return CompletableFuture.supplyAsync(() -> 
CompletableFuture.supplyAsync(AsyncUtils.supplierWithCustomDependencies(() -> {
                Dob dob = externalServiceTwo.getDeeEntity(f.getA());
                f.setD(dob.getD());
                Efo efo = externalServiceThree.getEeeEntity(f.getA());
                f.setE(efo.getE());
                return f 
           });

private List<Foo> processResponse() {
   List<CompletableFuture<Foo>> futureResult = sendRequest();

   List<Foo> fooList = futureResult.stream()
            .map(CompletableFuture::join)
            .filter(Objects::nonNull)
            .collect(Collectors.toList());
     return fooList;
    }

}

Upvotes: 1

akuzminykh
akuzminykh

Reputation: 4723

You could just remember the async tasks and wait for their completion manually:

public List<Foo> getOrdresListe() {
    Foo[] fooArray = externalServiceOne.getFooList();

    final List<CompletableFuture<Foo>> futures = new ArrayList<>(); // remember each

    Arrays.stream(fooArray).forEach((f) -> {
        futures.add(CompletableFuture.supplyAsync(
                AsyncUtils.supplierWithCustomDependencies(() -> {
            Dob dob = externalServiceTwo.getDeeEntity(f.getA());
            f.setD(dob.getD());
            Efo efo = externalServiceThree.getEeeEntity(f.getA());
            f.setE(efo.getE());
            return f;
        })));
    });

    for (CompletableFuture<Foo> future : futures) {
        future.join(); // wait for each
    }

    List<Foo> fooList = Arrays.asList(fooArray);
    return fooList;
}

Upvotes: 1

Alanpatchi
Alanpatchi

Reputation: 1199

Why don't you create all CompletableFutures as an intermediate operation and wait for all async executions to finish in the terminal operation.

Arrays.stream(fooArray).map((f) -> CompletableFuture.supplyAsync(AsyncUtils.supplierWithCustomDependencies(() -> {
                Dob dob = externalServiceTwo.getDeeEntity(f.getA());
                f.setD(dob.getD());
                Efo efo = externalServiceThree.getEeeEntity(f.getA());
                f.setE(efo.getE());
                return f;
            })
    )).forEach(fooCompletableFuture -> {
        try {
            fooCompletableFuture.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    });

Upvotes: 1

Related Questions