bnGG
bnGG

Reputation: 371

Java Executor get first result that matches condition

Is there a way to stop Javas ExecuterService, when a Callable returns with a result that matches a condition?

I have the following algorithm (code A):

MyObject functionA(SomeObject someObject) {
    for(int i = 0; i < 100; ++i) {
        MyObject result = someFunction(i, someObject);
        if (result != null) {
            return result;
        }
    }
    return null;
}

To speed this up, I changed the code to (code B):

MyObject functionB(SomeObject someObject) {
    final ExecutorService executorService = Executors.newCachedThreadPool();
    List<Callable<MyObject>> callableList = new ArrayList<Callable<MyObject>>();
    for(int i = 0; i < 100; ++i) {
        final int finalI = i;
        Callable<MyObject> newCallable = () -> {
            return someFunction(finalI, someObject);
        };
        callableList.add(newCallable);
    }
    List<Future<MyObject>> futures = executorService.invokeAll(callableList);
    for(int i = 0; i < futures.size(); ++i) {
        Future<MyObject> future = futures.get(i);
        if(future.get() != null) {
            return future.get();
        }
    }
    return null;
}

On average functionB runs 5 times as fast as functionA (on 8 cores).

This is good, but not perfect. In functionA someFunction() is called about 20 times on average before a result != null is found. In functionB someFunction() is always called 100 times.

Is there a way to

a) stop executorService, when the first thread finishes with a result != null

or better

b) stop executorService, when a thread finishes and a result != null was found by a thread and all threads, with a lower finalI value than the thread which found the result != null, were finished with reuslt == null

thanks

------- EDIT ---------

Thanks to the answers of Alex Crazy and Saxon I am able to answer my question A.

  1. All Callables whose result does not meet my condition should throw an exception
  2. I should use invokeAny instead of invokeAll
  3. I should use newFixedThreadPool instead of newCachedThreadPool

The use of FixedThreadPool is important, because CachedThreadPool runs all 100 tasks parallel. FixedThreadPool only runs as many tasks parallel as you define. New tasks are only started, when old tasks are finished.

If you don’t check if your callable was interrupted (because invokeAny found a result), then all callables will run till they finish. So you get your result faster with invokeAny, but the processors are as long working as they would with invokeAll.

So my new function looks like this:

MyObject functionC(SomeObject someObject) {
    final ExecutorService executorService = Executors.newFixedThreadPool(8);
    List<Callable<MyObject>> callableList = new ArrayList<Callable<MyObject>>();
    for(int i = 0; i < 100; ++i) {
        final int finalI = i;
        Callable<MyObject> newCallable = () -> {
            MyObject result = someFunction(finalI, someObject);
            if(result == null) {
                throw new Exception();
            }
            return result;
        };
        callableList.add(newCallable);
    }
    try {
        MyObject result = executorService.invokeAny(callableList);
        return result;
    } catch (InterruptedException | ExecutionException e) {
        return null;
    }
}

functionC solves my question A, but it does not always return the MyObject object which was created by someFunction() with the lowest input value of finalI. E.g. someFunction(3, someObject) and someFunction(6, someObject) would return an MyObject object which is not null.
In some runs the result of someFunction(3, someObject) would be returned and in other runs the result of someFunction(6, someObject) would be returned. It is not deterministic.

So I wrote the new class ResultCallable

public abstract class ResultCallable<E>  implements Callable<E> {
    E result = null;
}

and functionD

MyObject functionD(SomeObject someObject) {
    final ExecutorService executorService = Executors.newFixedThreadPool(8);
    List<Callable<MyObject>> callableList = new ArrayList<Callable<MyObject>>();
    for(int i = 0; i < 100; ++i) {
        final int finalI = i;
        ResultCallable<MyObject> newCallable = () -> {
            this.result = someFunction(finalI, someObject);
            if(this.result == null) {
                throw new Exception();
            }
            return this.result;
        };
        callableList.add(newCallable);
    }
    try {
        MyObject result = executorService.invokeAny(callableList);
        executorService.shutdownNow();
        executorService.awaitTermination(5, TimeUnit.SECONDS);
        for(int i = 0; i < callableList.size(); ++i) {
            if(callableList.get(i).result != null) {
                return callableList.get(i).result;
            }
        }
    } catch (InterruptedException | ExecutionException e) {
        return null;
    }
    return null;
}

If someFunction(3, someObject) and someFunction(6, someObject) would return an MyObject object which is not null, now the main thread waits after the first valid result for all other started tasks to finish (with newFixedThreadPool(8) there are up to 7 other tasks running). Because the executorService runs the callables in the order they are in callableList, all callables with an lower finalI value than the callable which returned the first valid result have been started and would finish before line 19.

This is good, but not perfect.
Suppose the task with finalI = 6 finishes first and the tasks with finalI from 0 to 5 and 7 are still running. Then because executorService.shutdownNow(); no other tasks would start and the main thread would wait till all running tasks finish.
But I only need to wait for the tasks with finalI between 0 and 3 to finish. Tasks 3 because it also has a result != null and tasks 0, 1 and 2 to prove that there are no results below task 3.

Could I replace executorService.awaitTermination and the for-loop with something like the following?

for(int i = 0; i < callableList.size(); ++i) {
    awaitTermination(callableList.get(i));
    if(callableList.get(i).result != null) {
        return callableList.get(i).result;
    }
}

awaitTermination(callableList.get(i)); is pseudocode. I couldn't find a function to wait till a callable is executed.

----------------------- edit 2 ------------------------

I solved it myself.

I added final CountDownLatch countDownLatch = new CountDownLatch(1); to my class ResultCallable.
After this.result = someFunction(finalI, someObject); I added this.countDownLatch.countDown(); and awaitTermination(callableList.get(i)); got replaced by callableList.get(i).countDownLatch.await();

If you have better ideas, let me know.

Upvotes: 1

Views: 723

Answers (2)

Alex Crazy
Alex Crazy

Reputation: 252

The propper way to solve your problem is:

  1. To make somewhere additional varaible like private boolean isFinished which will be shared between all threads and check if this varaible changes its value pereodically

  2. If it is possible you have to make your Callable looks like this:

     public class SomeRunnableFunction implements Callable {
    
          private volatile boolean running = true;
    
          public void terminate() {
              running = false;
          }
    
          @Override
          public Object call() throws Exception {
               while (running) {
                    // your custom logic
                    return new Object();
               }
               return null;
          }
    }
    
  3. So then one thread is finished change private boolean isFinished to true and then you have to cancel all already started tasks by calling someRunnableFunction.terminate()

  4. Also you can use feature.cancel() insted of someRunnableFunction.terminate() then yor callable will look like:

     public class SomeRunnableFunction implements Callable {
    
          @Override
          public Object call() throws Exception {
               while (true) {
                    // Check regularly if the thread has been
                    // interrupted and if so throws an exception to stop
                    // the task immediately 
                    if (Thread.currentThread().isInterrupted()) {
                         throw new InterruptedException("Thread interrupted");
                    }
                    // your custom logic
                    return new Object();
               }
               return null;
          }
    }
    

Upvotes: 2

Saxon
Saxon

Reputation: 937

Why not to use invokeAny instead of invokeAll. It's enough to throw an exception instead of returning null when the result is bad, and return MyObject instead of List<Future> on ExecutorService invocation.

Upvotes: 2

Related Questions