zafar142003
zafar142003

Reputation: 2159

How do I cancel spawned Callables after a Future.cancel?

I have an ExecutorService to which I submit n Callables. In each of these n Callables, I spawn new Callables which are submitted to the same ExecutorService. When I encounter a java.util.concurrent.TimeoutException on any of the first n Callables at their respective Future.get(), I do Future.cancel(true) on that Callable. I also need to cancel the tasks which I submitted because of these cancelled Callables.

One way I could do this is to store the spawned Callables in a map of lists somewhere up in the call stack, and while cancelling the Callable, use it to find its children and cancel them too. But this does not seem a very good solution to me, as the logic of cancellation should reside somewhere closer.

Is there a handler/a better way that could give me this flexibility?

Upvotes: 1

Views: 1990

Answers (1)

Gray
Gray

Reputation: 116938

When I encounter a java.util.concurrent.TimeoutException on any of the first n Callables at their respective Future.get(), I do Future.cancel() on that Callable. I also need to cancel the tasks which I submitted because of these cancelled Callables.

When you call future.cancel(false) it will only stop the job from executing if it is not already running. You wouldn't have a problem in this case because no "sub-jobs" have been created.

So I assume you are talking about future.cancel(true) which will interrupt the thread if the job is already running. It is important to realize that this interruption only affects the few methods that throw InterruptedException – like Thread.sleep(...), obj.wait(...), etc.. Otherwise you will need to test for the interrupt flag in your code. For example:

while (!Thread.currentThread().isInterrupted()) {
   ...
}

What I would do is to have each of your jobs just keep a list of the Futures "sub-jobs" that it has spawned. When it catches an InterruptedException or when it notices that its thread interrupt flag has been set then it can call future.cancel(true) on all of the jobs it forked.

Something like:

final List<Future<...>> subJobs = new ArrayList<>();
...
while (true) {
   if (Thread.currentThread().isInterrupted()) {
      cleanupSubJobs(subJobs);
      break;
   }
   try {
      Thread.sleep(...);
   } catch (InterruptedException ie) {
      // always a good pattern
      Thread.currentThread.interrupt();
      cleanupSubJobs(subJobs);
      return;
   }
   ...
   if (weNeedSubJob) {
      subJobs.add(threadPool.submit(...));
   }
}
...
private void cleanupSubJobs(List<Future<...>> subJobs) {
   for (Future<...> subJob : subJobs) {
      subJob.cancel(true);
   }
}

Upvotes: 3

Related Questions