Reputation: 6629
I am investigating the CompletionService class, and I find really useful the decoupling of the submitting queue from the completition queue.
But I miss a way to poll/take cancelled tasks as well (which could be considered completed in a way). Can it be done easily someway?
Future<String> task1Future = completionService.submit(myCallableTask1);
Future<String> task2Future = completionService.submit(myCallableTask2);
task2Future.cancel();
Future<String> lastComplTaskFuture = completionService.take();
//Seems to return only the completed or
//interrupted tasks, not those cancelled (task2)
EDIT: After checking some of the answers, I realized what is happening. CompletitionService returns in the same order as the submitted jobs. If you run job a, b and c; cancel b and c while a is working; and finally poll the completitionService, the cancellation of b and c won't be notified until a taks is terminated. Also, I realized that if you shutdown the executor instead of cancelling individiual tasks, those tasks still in the queue don't reach completitionservice, even if their cancellation is finished while the active tasks have not ended yet.
EDIT2: Ok, I added a whole testcase
import java.util.concurrent.*;
public class CompletionTest {
public static void main(String[] args){
CompletionService<String> completion =
new ExecutorCompletionService<String>(Executors.newSingleThreadExecutor());
Future<String> aFuture =
completion.submit(() -> {Thread.sleep(10000); return "A";});
completion.submit(() -> {return "B";}).cancel(true);
completion.submit(() -> {return "C";}).cancel(true);
completion.submit(() -> {return "D";}).cancel(true);
long arbitraryTime = System.currentTimeMillis();
while (true){
String result = getNextResult(completion);
System.out.println(result);
if (System.currentTimeMillis() > arbitraryTime+5000){
aFuture.cancel(true);
}
}
}
public static String getNextResult(CompletionService<String> service){
try {
String taskId = service.take().get();
return "'"+taskId+"' job completed successfully.";
}
catch (InterruptedException e ) { return "Unknown job was interrupted/stopped while waiting (no thread.wait() or infinite loop in the job so I guess this should not be possible)."; }
catch (CancellationException e) { return "Unknown job was cancelled."; }
catch (ExecutionException e) {
return "Unknown job returned with the following internal exception while running: " + e.getCause().getMessage();
}
}
}
I expected an output like:
Unknown job was cancelled.
Unknown job was cancelled.
Unknown job was cancelled.
'A' job completed successfully.
But instead it was:
'A' job completed successfully.
Unknown job was cancelled.
Unknown job was cancelled.
Unknown job was cancelled.
I even tried to use a PriorityBlockingQueue as a queue for the CompletionService, using Comparator.<Future<String>,Boolean>comparing(Future::isCancelled).reversed()
but it did not worked neither (I guess it does not resort if an element changes state although)
Upvotes: 2
Views: 725
Reputation: 279960
You're submitting tasks to a thread pool with a single thread.
The first task you submit will consume that thread (sleeping for a short time).
The others will be queued within the ExecutorService
. Cancelling the corresponding futures does not remove the corresponding task from that queue. It just changes the state of the Future
.
The tasks will remain in the queue until a thread is free to process them. The thread will notice the corresponding Future
is cancelled and cancel the entire task as well. That's when the CompletionService
knows that they are "done", no sooner.
Consider increasing the size of your thread pool.
Upvotes: 1