Reputation: 1634
I am new Completable Future. I am trying to call a method parallel for a list of elements (which are arguments) and then combine the results to create a final response. I am also trying to set up timeout of 50 ms so that if the call doesn't return in 50 ms I will return a default value.
So far I ve tried this :
{
List<ItemGroup> result = Collections.synchronizedList(Lists.newArrayList());
try {
List<CompletableFuture> completableFutures = response.getItemGroupList().stream()
.map(inPutItemGroup ->
CompletableFuture.runAsync(() -> {
final ItemGroup itemGroup = getUpdatedItemGroup(inPutItemGroup); //call which I am tryin to make parallel
// this is thread safe
if (null != itemGroup) {
result.add(itemGroup); //output of the call
}
}, executorService).acceptEither(timeoutAfter(50, TimeUnit.MILLISECONDS),inPutItemGroup)) //this line throws error
.collect(Collectors.toList());
// this will wait till all threads are completed
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()]))
.join();
} catch (final Throwable t) {
final String errorMsg = String.format("Exception occurred while rexecuting parallel call");
log.error(errorMsg, e);
result = response.getItemGroupList(); //default value - return the input value if error
}
Response finalResponse = Response.builder()
.itemGroupList(result)
.build();
}
private <T> CompletableFuture<T> timeoutAfter(final long timeout, final TimeUnit unit) {
CompletableFuture<T> result = new CompletableFuture<T>();
//Threadpool with 1 thread for scheduling a future that completes after a timeout
ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1);
String message = String.format("Process timed out after %s %s", timeout, unit.name().toLowerCase());
delayer.schedule(() -> result.completeExceptionally(new TimeoutException(message)), timeout, unit);
return result;
}
But I keep getting error saying :
error: incompatible types: ItemGroup cannot be converted to Consumer<? super Void>
[javac] itemGroup))
incompatible types: inference variable T has incompatible bounds
[javac] .collect(Collectors.toList());
[javac] ^
[javac] equality constraints: CompletableFuture
[javac] lower bounds: Object
[javac] where T is a type-variable:
Can some one please tell me what I am doing wrong here ? And please correct me if I am going in the wrong direction.
Thanks.
Upvotes: 1
Views: 5659
Reputation: 298153
Instead of
acceptEither(timeoutAfter(50, TimeUnit.MILLISECONDS), inPutItemGroup))
you’d need
applyToEither(timeoutAfter(50, TimeUnit.MILLISECONDS), x -> inPutItemGroup)
to compile the code. “accept” is an action consuming a value without returning a new value, “apply” is an action that produces a new value.
However, there’s still a logical error. The future returned by timeoutAfter
will be completed exceptionally, so dependent stages will get completed exceptionally as well, without evaluating functions, so this chaining method is not suitable for replacing an exception with a default value.
Even worse, fixing this would create a new future which gets completed by either source future, but that does not affect the result.add(itemGroup)
action performed in one of the source futures. In your code, the resulting future is only used to wait for the completion, but not for evaluating the result. So when your timeout elapses, you would stop waiting for the completion, whereas there still might be background threads modifying the list.
The correct logic is to separate the steps of fetching the value, which can get superseded by a default value on timeout, and the step of adding the result, either the fetched value or default value, to the result list. Then, you can wait for the completion of all add
actions. On a timeout, there might be still ongoing getUpdatedItemGroup
evaluations (there is no way to stop their execution), but their result would be ignored, so it doesn’t affect the result list.
It’s also worth pointing out that creating a new ScheduledExecutorService
for every list element (that is not shut down after use, to make matters worse), is not the right approach.
// result must be effectively final
List<ItemGroup> result = Collections.synchronizedList(new ArrayList<>());
List<ItemGroup> endResult = result;
ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1);
try {
CompletableFuture<?>[] completableFutures = response.getItemGroupList().stream()
.map(inPutItemGroup ->
timeoutAfter(delayer, 50, TimeUnit.MILLISECONDS,
CompletableFuture.supplyAsync(
() -> getUpdatedItemGroup(inPutItemGroup), executorService),
inPutItemGroup)
.thenAccept(itemGroup -> {
// this is thread safe, but questionable,
// e.g. the result list order is not maintained
if(null != itemGroup) result.add(itemGroup);
})
)
.toArray(CompletableFuture<?>[]::new);
// this will wait till all threads are completed
CompletableFuture.allOf(completableFutures).join();
} catch(final Throwable t) {
String errorMsg = String.format("Exception occurred while executing parallel call");
log.error(errorMsg, e);
endResult = response.getItemGroupList();
}
finally {
delayer.shutdown();
}
Response finalResponse = Response.builder()
.itemGroupList(endResult)
.build();
private <T> CompletableFuture<T> timeoutAfter(ScheduledExecutorService es,
long timeout, TimeUnit unit, CompletableFuture<T> f, T value) {
es.schedule(() -> f.complete(value), timeout, unit);
return f;
}
Here, the supplyAsync
produces a CompletableFuture
which will provide the result of the getUpdatedItemGroup
evaluation. The timeoutAfter
invocation will schedule a completion with a default value after the timeout, without creating a new future, then, the dependent action chained via thenAccept
will add the result value to the result
list.
Note that a synchronizedList
allows adding elements from multiple threads, but adding from multiple threads will result in an unpredictable order, unrelated to the order of the source list.
Upvotes: 2
Reputation: 19926
The signature of acceptEither
looks like this:
public CompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other,
Consumer<? super T> action
) {
And the line which is throwing the error looks like this:
.acceptEither(
timeoutAfter(50, TimeUnit.MILLISECONDS),
inPutItemGroup
)
So you see that you try to pass an ItemGroup
as a Consumer<? super T>
where T
was infered to be Void
and hence you get the expected error:
error: incompatible types: ItemGroup cannot be converted to Consumer<? super Void>
Upvotes: 0