Reputation: 371
Is there a way to stop Javas ExecuterService, when a Callable returns with a result that matches a condition?
I have the following algorithm (code A):
MyObject functionA(SomeObject someObject) {
for(int i = 0; i < 100; ++i) {
MyObject result = someFunction(i, someObject);
if (result != null) {
return result;
}
}
return null;
}
To speed this up, I changed the code to (code B):
MyObject functionB(SomeObject someObject) {
final ExecutorService executorService = Executors.newCachedThreadPool();
List<Callable<MyObject>> callableList = new ArrayList<Callable<MyObject>>();
for(int i = 0; i < 100; ++i) {
final int finalI = i;
Callable<MyObject> newCallable = () -> {
return someFunction(finalI, someObject);
};
callableList.add(newCallable);
}
List<Future<MyObject>> futures = executorService.invokeAll(callableList);
for(int i = 0; i < futures.size(); ++i) {
Future<MyObject> future = futures.get(i);
if(future.get() != null) {
return future.get();
}
}
return null;
}
On average functionB runs 5 times as fast as functionA (on 8 cores).
This is good, but not perfect. In functionA someFunction() is called about 20 times on average before a result != null is found. In functionB someFunction() is always called 100 times.
Is there a way to
a) stop executorService, when the first thread finishes with a result != null
or better
b) stop executorService, when a thread finishes and a result != null was found by a thread and all threads, with a lower finalI value than the thread which found the result != null, were finished with reuslt == null
thanks
------- EDIT ---------
Thanks to the answers of Alex Crazy and Saxon I am able to answer my question A.
The use of FixedThreadPool is important, because CachedThreadPool runs all 100 tasks parallel. FixedThreadPool only runs as many tasks parallel as you define. New tasks are only started, when old tasks are finished.
If you don’t check if your callable was interrupted (because invokeAny found a result), then all callables will run till they finish. So you get your result faster with invokeAny, but the processors are as long working as they would with invokeAll.
So my new function looks like this:
MyObject functionC(SomeObject someObject) {
final ExecutorService executorService = Executors.newFixedThreadPool(8);
List<Callable<MyObject>> callableList = new ArrayList<Callable<MyObject>>();
for(int i = 0; i < 100; ++i) {
final int finalI = i;
Callable<MyObject> newCallable = () -> {
MyObject result = someFunction(finalI, someObject);
if(result == null) {
throw new Exception();
}
return result;
};
callableList.add(newCallable);
}
try {
MyObject result = executorService.invokeAny(callableList);
return result;
} catch (InterruptedException | ExecutionException e) {
return null;
}
}
functionC solves my question A, but it does not always return the MyObject object which was created by someFunction() with the lowest input value of finalI.
E.g. someFunction(3, someObject) and someFunction(6, someObject) would return an MyObject object which is not null.
In some runs the result of someFunction(3, someObject) would be returned and in other runs the result of someFunction(6, someObject) would be returned.
It is not deterministic.
So I wrote the new class ResultCallable
public abstract class ResultCallable<E> implements Callable<E> {
E result = null;
}
and functionD
MyObject functionD(SomeObject someObject) {
final ExecutorService executorService = Executors.newFixedThreadPool(8);
List<Callable<MyObject>> callableList = new ArrayList<Callable<MyObject>>();
for(int i = 0; i < 100; ++i) {
final int finalI = i;
ResultCallable<MyObject> newCallable = () -> {
this.result = someFunction(finalI, someObject);
if(this.result == null) {
throw new Exception();
}
return this.result;
};
callableList.add(newCallable);
}
try {
MyObject result = executorService.invokeAny(callableList);
executorService.shutdownNow();
executorService.awaitTermination(5, TimeUnit.SECONDS);
for(int i = 0; i < callableList.size(); ++i) {
if(callableList.get(i).result != null) {
return callableList.get(i).result;
}
}
} catch (InterruptedException | ExecutionException e) {
return null;
}
return null;
}
If someFunction(3, someObject)
and someFunction(6, someObject)
would return an MyObject object which is not null, now the main thread waits after the first valid result for all other started tasks to finish (with newFixedThreadPool(8) there are up to 7 other tasks running).
Because the executorService runs the callables in the order they are in callableList, all callables with an lower finalI value than the callable which returned the first valid result have been started and would finish before line 19.
This is good, but not perfect.
Suppose the task with finalI = 6
finishes first and the tasks with finalI from 0 to 5 and 7 are still running.
Then because executorService.shutdownNow(); no other tasks would start and the main thread would wait till all running tasks finish.
But I only need to wait for the tasks with finalI between 0 and 3 to finish. Tasks 3 because it also has a result != null and tasks 0, 1 and 2 to prove that there are no results below task 3.
Could I replace executorService.awaitTermination and the for-loop with something like the following?
for(int i = 0; i < callableList.size(); ++i) {
awaitTermination(callableList.get(i));
if(callableList.get(i).result != null) {
return callableList.get(i).result;
}
}
awaitTermination(callableList.get(i));
is pseudocode. I couldn't find a function to wait till a callable is executed.
----------------------- edit 2 ------------------------
I solved it myself.
I added final CountDownLatch countDownLatch = new CountDownLatch(1);
to my class ResultCallable.
After this.result = someFunction(finalI, someObject);
I added this.countDownLatch.countDown();
and awaitTermination(callableList.get(i));
got replaced by callableList.get(i).countDownLatch.await();
If you have better ideas, let me know.
Upvotes: 1
Views: 723
Reputation: 252
The propper way to solve your problem is:
To make somewhere additional varaible like private boolean isFinished
which will be shared between all threads and check if this varaible changes its value pereodically
If it is possible you have to make your Callable looks like this:
public class SomeRunnableFunction implements Callable {
private volatile boolean running = true;
public void terminate() {
running = false;
}
@Override
public Object call() throws Exception {
while (running) {
// your custom logic
return new Object();
}
return null;
}
}
So then one thread is finished change private boolean isFinished
to true and then you have to cancel all already started tasks by calling someRunnableFunction.terminate()
Also you can use feature.cancel()
insted of someRunnableFunction.terminate()
then yor callable will look like:
public class SomeRunnableFunction implements Callable {
@Override
public Object call() throws Exception {
while (true) {
// Check regularly if the thread has been
// interrupted and if so throws an exception to stop
// the task immediately
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread interrupted");
}
// your custom logic
return new Object();
}
return null;
}
}
Upvotes: 2
Reputation: 937
Why not to use invokeAny instead of invokeAll. It's enough to throw an exception instead of returning null when the result is bad, and return MyObject instead of List<Future> on ExecutorService invocation.
Upvotes: 2