user3407267
user3407267

Reputation: 1634

Completable Future with Timeout not working

I am new Completable Future. I am trying to call a method parallel for a list of elements (which are arguments) and then combine the results to create a final response. I am also trying to set up timeout of 50 ms so that if the call doesn't return in 50 ms I will return a default value.

So far I ve tried this :

    {

     List<ItemGroup> result = Collections.synchronizedList(Lists.newArrayList());

    try {
     List<CompletableFuture> completableFutures = response.getItemGroupList().stream()
     .map(inPutItemGroup -> 
       CompletableFuture.runAsync(() -> {
           final ItemGroup itemGroup = getUpdatedItemGroup(inPutItemGroup);               //call which I am tryin to make parallel

           // this is thread safe
           if (null != itemGroup) {
                  result.add(itemGroup); //output of the call
           }
        }, executorService).acceptEither(timeoutAfter(50, TimeUnit.MILLISECONDS),inPutItemGroup))  //this line throws error     
     .collect(Collectors.toList());

// this will wait till all threads are completed
    CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()]))
                        .join();
} catch (final Throwable t) {
     final String errorMsg = String.format("Exception occurred while rexecuting parallel call");
                log.error(errorMsg, e);
                result = response.getItemGroupList(); //default value - return the input value if error
    }

    Response finalResponse = Response.builder()
                    .itemGroupList(result)
                    .build();

    }

     private <T> CompletableFuture<T> timeoutAfter(final long timeout, final TimeUnit unit) {
            CompletableFuture<T> result = new CompletableFuture<T>();

            //Threadpool with 1 thread for scheduling a future that completes after a timeout
            ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1);
            String message = String.format("Process timed out after %s %s", timeout, unit.name().toLowerCase());
            delayer.schedule(() -> result.completeExceptionally(new TimeoutException(message)), timeout, unit);
            return result;
     }

But I keep getting error saying :

 error: incompatible types: ItemGroup cannot be converted to Consumer<? super Void>
    [javac]                             itemGroup))

incompatible types: inference variable T has incompatible bounds
    [javac]                     .collect(Collectors.toList());
    [javac]                             ^
    [javac]     equality constraints: CompletableFuture
    [javac]     lower bounds: Object
    [javac]   where T is a type-variable:

Can some one please tell me what I am doing wrong here ? And please correct me if I am going in the wrong direction.

Thanks.

Upvotes: 1

Views: 5659

Answers (2)

Holger
Holger

Reputation: 298153

Instead of

acceptEither(timeoutAfter(50, TimeUnit.MILLISECONDS), inPutItemGroup))

you’d need

applyToEither(timeoutAfter(50, TimeUnit.MILLISECONDS), x -> inPutItemGroup)

to compile the code. “accept” is an action consuming a value without returning a new value, “apply” is an action that produces a new value.

However, there’s still a logical error. The future returned by timeoutAfter will be completed exceptionally, so dependent stages will get completed exceptionally as well, without evaluating functions, so this chaining method is not suitable for replacing an exception with a default value.

Even worse, fixing this would create a new future which gets completed by either source future, but that does not affect the result.add(itemGroup) action performed in one of the source futures. In your code, the resulting future is only used to wait for the completion, but not for evaluating the result. So when your timeout elapses, you would stop waiting for the completion, whereas there still might be background threads modifying the list.

The correct logic is to separate the steps of fetching the value, which can get superseded by a default value on timeout, and the step of adding the result, either the fetched value or default value, to the result list. Then, you can wait for the completion of all add actions. On a timeout, there might be still ongoing getUpdatedItemGroup evaluations (there is no way to stop their execution), but their result would be ignored, so it doesn’t affect the result list.

It’s also worth pointing out that creating a new ScheduledExecutorService for every list element (that is not shut down after use, to make matters worse), is not the right approach.

// result must be effectively final
List<ItemGroup> result = Collections.synchronizedList(new ArrayList<>());
List<ItemGroup> endResult = result;
ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1);
try {
    CompletableFuture<?>[] completableFutures = response.getItemGroupList().stream()
    .map(inPutItemGroup ->
        timeoutAfter(delayer, 50, TimeUnit.MILLISECONDS,
            CompletableFuture.supplyAsync(
                () -> getUpdatedItemGroup(inPutItemGroup), executorService),
            inPutItemGroup)
         .thenAccept(itemGroup -> {
            // this is thread safe, but questionable,
            // e.g. the result list order is not maintained
            if(null != itemGroup) result.add(itemGroup);
         })
    )
    .toArray(CompletableFuture<?>[]::new);

    // this will wait till all threads are completed
    CompletableFuture.allOf(completableFutures).join();
} catch(final Throwable t) {
    String errorMsg = String.format("Exception occurred while executing parallel call");
    log.error(errorMsg, e);
    endResult = response.getItemGroupList();
}
finally {
    delayer.shutdown();
}

Response finalResponse = Response.builder()
    .itemGroupList(endResult)
    .build();
private <T> CompletableFuture<T> timeoutAfter(ScheduledExecutorService es,
    long timeout, TimeUnit unit, CompletableFuture<T> f, T value) {

    es.schedule(() -> f.complete(value), timeout, unit);
    return f;
}

Here, the supplyAsync produces a CompletableFuture which will provide the result of the getUpdatedItemGroup evaluation. The timeoutAfter invocation will schedule a completion with a default value after the timeout, without creating a new future, then, the dependent action chained via thenAccept will add the result value to the result list.

Note that a synchronizedList allows adding elements from multiple threads, but adding from multiple threads will result in an unpredictable order, unrelated to the order of the source list.

Upvotes: 2

Lino
Lino

Reputation: 19926

The signature of acceptEither looks like this:

public CompletableFuture<Void> acceptEither(
    CompletionStage<? extends T> other, 
    Consumer<? super T> action
) {

And the line which is throwing the error looks like this:

.acceptEither(
     timeoutAfter(50, TimeUnit.MILLISECONDS),
     inPutItemGroup
)

So you see that you try to pass an ItemGroup as a Consumer<? super T> where T was infered to be Void and hence you get the expected error:

error: incompatible types: ItemGroup cannot be converted to Consumer<? super Void>

Upvotes: 0

Related Questions