Alexey Yahno
Alexey Yahno

Reputation: 41

How to use parallel processing in most efficient and elegant way in java

I have different sources of data from which I want to request in parallel (since each of this request is an http call and may be pretty time consuming). But I'm going to use only 1 response from these requests. So I kind of prioritize them. If the first response is invalid I'm going to check the second one. If it's also invalid I want to use the third, etc. But I want to stop processing and return the result as soon as I receive the first correct response.

To simulate the problem I created the following code, where I'm trying to use java parallel streaming. But the problem is that I receive final results only after processing all requests.

public class ParallelExecution {

    private static Supplier<Optional<Integer>> testMethod(String strInt) {
        return () -> {
            Optional<Integer> result = Optional.empty();
            try {
                result = Optional.of(Integer.valueOf(strInt));
                System.out.printf("converted string %s to int %d\n",
                        strInt,
                        result.orElse(null));
            } catch (NumberFormatException ex) {
                System.out.printf("CANNOT CONVERT %s to int\n", strInt);
            }

            try {
                int randomValue = result.orElse(10000);
                TimeUnit.MILLISECONDS.sleep(randomValue);
                System.out.printf("converted string %s to int %d in %d milliseconds\n",
                        strInt,
                        result.orElse(null), randomValue);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return result;
        };
    }

    public static void main(String[] args) {
        Instant start = Instant.now();
        System.out.println("Starting program: " + start.toString());
        List<Supplier<Optional<Integer>>> listOfFunctions = new ArrayList();
        for (String arg: args) {
            listOfFunctions.add(testMethod(arg));
        }
        Integer value = listOfFunctions.parallelStream()
                .map(function -> function.get())
                .filter(optValue -> optValue.isPresent()).map(val-> {
                    System.out.println("************** VAL: " + val);
                    return val;
                }).findFirst().orElse(null).get();
        Instant end = Instant.now();
        Long diff = end.toEpochMilli() - start.toEpochMilli();
        System.out.println("final value:" + value + ", worked during " + diff + "ms");
    }
}

So when I execute the program using the following command:

$java ParallelExecution dfafj 34 1341 4656 dfad 245df 5767

I want to get the result "34" as soon as possible (around after 34 milliseconds) but in fact, I'm waiting for more than 10 seconds.

Could you help to find the most efficient solution for this problem?

Upvotes: 4

Views: 1080

Answers (3)

Ouney
Ouney

Reputation: 1194

I have tried it using competableFutures and anyOf method. It will return when any one of the future is completed. Now, key to stop other tasks is to provide your own executor service to the completableFuture(s) and shutting it down when required.

  public static void main(String[] args) {
    Instant start = Instant.now();
    System.out.println("Starting program: " + start.toString());
    CompletableFuture<Optional<Integer>> completableFutures[] = new CompletableFuture[args.length];
    ExecutorService es = Executors.newFixedThreadPool(args.length,r -> {
            Thread t = new Thread(r);
            t.setDaemon(false);
            return t;
    });

    for (int i = 0;i < args.length; i++) {
        completableFutures[i] = CompletableFuture.supplyAsync(testMethod(args[i]),es);
    }
    CompletableFuture.anyOf(completableFutures).
            thenAccept(res-> {
                System.out.println("Result - " + res + ", Time Taken : " + (Instant.now().toEpochMilli()-start.toEpochMilli()));
                es.shutdownNow();
            });
}

PS :It will throw interrupted exceptions that you can ignore in try catch block and not print the stack trace.Also, your thread pool size ideally should be same as length of args array.

Upvotes: 0

Andrew
Andrew

Reputation: 49606

ExecutorService#invokeAny looks like a good option.

List<Callable<Optional<Integer>>> tasks = listOfFunctions
    .stream()
    .<Callable<Optional<Integer>>>map(f -> f::get)
    .collect(Collectors.toList());

ExecutorService service = Executors.newCachedThreadPool();
Optional<Integer> value = service.invokeAny(tasks);

service.shutdown();

I converted your List<Supplier<Optional<Integer>>> into a List<Callable<Optional<Integer>>> to be able to pass it in invokeAny. You may build Callables initially. Then, I created an ExecutorService and submitted the tasks.

The result of the first successfully executed task will be returned as soon as that result is returned from a task. Other tasks will end up interrupted.

You also may want to look into CompletionService.

List<Callable<Optional<Integer>>> tasks = Arrays
    .stream(args)
    .<Callable<Optional<Integer>>>map(arg -> () -> testMethod(arg).get())
    .collect(Collectors.toList());

final ExecutorService underlyingService = Executors.newCachedThreadPool();
final ExecutorCompletionService<Optional<Integer>> service = new ExecutorCompletionService<>(underlyingService);
tasks.forEach(service::submit);

Optional<Integer> value = service.take().get();
underlyingService.shutdownNow();

Upvotes: 2

daniu
daniu

Reputation: 14999

You can use a queue to put your results in:

private static void testMethod(String strInt, BlockingQueue<Integer> queue) {
    // your code, but instead of returning anything:
    result.ifPresent(queue::add);
}

and then call it with

for (String s : args) {
    CompletableFuture.runAsync(() -> testMethod(s, queue));
}
Integer result = queue.take();

Note that this will only handle the first result, as in your sample.

Upvotes: 0

Related Questions