js_248
js_248

Reputation: 2112

Get result from future without completing all

I am creating 10 threads and adding 2 kinds of jobs to them as follows.

public class ParallelAdder {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ExecutorService executor = Executors.newFixedThreadPool(10);
            List<Future<Integer>> list = new ArrayList<Future<Integer>>();
            for (int i = 0; i < 10; i++) {
                Future<Integer> future;
                if (i % 2 == 0) {
                    future = executor.submit(new Call1());
                } else {
                    future = executor.submit(new Call2());
                }

                list.add(future);

            }

            for(Future<Integer> fut : list) {
                System.out.println("From future is "+fut.get());
            }
        }
    }

class Call1 implements Callable<Integer> {
    public Integer call() throws Exception {
        System.out.println("Calling 1");
        try {
            Thread.sleep(100000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 10;
    }

}

class Call2 implements Callable<Integer> {
    public Integer call() throws Exception {
        System.out.println("Calling 2");
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 20;
    }

}

Call2 job returns faster as compared to Call1. In my future list, I want the result of a job as soon as it completes. It should not depend on all others job to be done.
Here Call2 return is waiting for Call1. How to solve this?

Upvotes: 1

Views: 463

Answers (1)

ernest_k
ernest_k

Reputation: 45319

The problem is that you're waiting by calling the blocking get here:

for(Future<Integer> fut : list) {
    System.out.println("From future is "+fut.get());
}

To solve this, you need to use reactive code. You could use the completable future API, which is designed for declarative reactive Future-like API:

ExecutorService executor = Executors.newFixedThreadPool(10);

Supplier<Integer> call1Supplier = () -> {
    System.out.println("Calling 1");
    try {
        Thread.sleep(100000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 10;
};

Supplier<Integer> call2Supplier = () -> {
    System.out.println("Calling 1");
    try {
        Thread.sleep(10);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 10;
};

Then that can be submitted to the same executor service, but using the reactive CompletableFuture that has support for callback-like objects.

List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
    if (i % 2 == 0) {
        CompletableFuture<Void> future = 
                CompletableFuture.supplyAsync(call1Supplier, executor)
                .thenAccept(number -> System.out.println("From future is " + number));
        futures.add(future);
    } else {
        CompletableFuture<Void> future = 
                CompletableFuture.supplyAsync(call2Supplier, executor)
                .thenAccept(number -> System.out.println("From future is " + number));
        futures.add(future);
    }
}

The following is just to make sure that the current thread doesn't exit before all async tasks have completed. But if it's a long-running application, such as a server, this may be unnecessary

for (CompletableFuture<Void> future : futures) {
    future.join();
}

Note that I inlined the code from Call1 and Call2 as the Callable implementation isn't necessary. But it's still a good idea to keep it in a separate class (unless the function objects are just OK).

Upvotes: 1

Related Questions