Reputation: 6126
I have 2 methods that have different return types that I want to run at the same time. Here is my code:
public void method(int id) {
final CompletableFuture<List<FooA>> fooACF = CompletableFuture.supplyAsync(() -> generateFooA(id));
final CompletableFuture<List<FooB>> fooBCF = CompletableFuture.supplyAsync(() -> generateFooB(id));
List<FooA> fooAs = fooACF.get();
List<FooB> fooBs = fooBCF.get();
//Do more processesing
}
public List<FooA> generateFooA(int id) {
//code
}
public List<FooB> generateFooB(int id) {
//code
}
But I don't know if both methods will run in parallel with the above code or if I'm just better off saying:
List<FooA> fooAs = generateFooA(id);
List<FooB> fooBs = generateFooB(id);
How do I use completeable futures properly to be able to run both methods in parallel?
Upvotes: 6
Views: 24291
Reputation: 1928
You can do it with Java 8+ with stream api.
For example we have a calculation flow like this :
result = (a + b) + (a - c) + (c * b) .
So we split this calculation with several methods like this :
public class Calculator {
public static int add(int a, int b) {
sleep(); //Imagine this calculation take several seconds
return a + b;
}
public static int minus(int a, int b) {
sleep(); //Imagine this calculation take several seconds
return a - b;
}
public static int divide(int a, int b) {
sleep(); //Imagine this calculation take several seconds
return a * b;
}
private static void sleep() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Note: You can see that for each method, 2 seconds have been calculated as the duration of the process.
Lagacy code style :
int a = 12;
int b = 4;
int c = 1;
LocalTime startDateTime = LocalTime.now();
int legacyCalculate = Calculator.add(a, b) + Calculator.minus(a, c) + Calculator.divide(c, b);
System.out.println("Result : " + legacyCalculate);
LocalTime endDateTime = LocalTime.now();
System.out.println("Process time : " + startDateTime.until(endDateTime, ChronoUnit.SECONDS) + " seconds");
// Result :
Result : 31
Process time : 6 seconds
Parallel computation with (ExecutorService or Thread):
ExecutorService es = Executors.newCachedThreadPool();
LocalTime startDateTime = LocalTime.now();
Future<Integer> r1 = es.submit(() -> Calculator.add(a, b));
Future<Integer> r2 = es.submit(() -> Calculator.minus(a, c));
Future<Integer> r3 = es.submit(() -> Calculator.divide(c, b));
System.out.println("Result : " + (r1.get() + r2.get() + r3.get()));
LocalTime endDateTime = LocalTime.now();
System.out.println("Process time : " + startDateTime.until(endDateTime, ChronoUnit.SECONDS) + " seconds");
es.shutdown();
// Result:
Result : 31
Process time : 2 seconds
Parallel Computation with Java8+ stream:
Supplier<Integer> r1 = () -> Calculator.add(a, b);
Supplier<Integer> r2 = () -> Calculator.minus(a, c);
Supplier<Integer> r3 = () -> Calculator.divide(c, b);
LocalTime startDateTime = LocalTime.now();
int result = Stream.of(r1, r2, r3)
.parallel() // Please pay attention to this line
.mapToInt(Supplier::get)
.sum();
LocalTime endDateTime = LocalTime.now();
System.out.println("Result : " + result);
System.out.println("Process time : " + startDateTime.until(endDateTime, ChronoUnit.SECONDS) + " seconds");
// Result:
Result : 31
Process time : 2 seconds
Upvotes: 0
Reputation: 41261
Your code works fine, using threads supplied by the ForkJoinPool.commonPool()
, as promised by the JavaDoc for CompletableFuture.supplyAsync(Supplier<U> supplier)
. You can prove it in a quick-and-dirty manner by adding some sleep()
and println()
statements. I've simplified your code a bit by using String
instead of List<Foo>
:
public void method(int id) throws InterruptedException, ExecutionException {
CompletableFuture<String> cfa = CompletableFuture.supplyAsync(() -> generateA(id));
CompletableFuture<String> cfb = CompletableFuture.supplyAsync(() -> generateB(id));
String fooA = cfa.get();
String fooB = cfb.get();
System.out.println("Final fooA " + fooA);
System.out.println("Final fooB " + fooB);
}
public String generateA(int id) {
System.out.println("Entering generateA " + Thread.currentThread());
sleep(2000);
System.out.println("Leaving generateA");
return "A" + id;
}
public String generateB(int id) {
System.out.println("Entering generateB " + Thread.currentThread());
sleep(1000);
System.out.println("Leaving generateB");
return "B" + id;
}
private void sleep(int n) {
try {
Thread.sleep(n);
} catch (InterruptedException ex) {
// never mind
}
}
Output is:
Entering generateFooA Thread[ForkJoinPool.commonPool-worker-1,5,main]
Entering generateFooB Thread[ForkJoinPool.commonPool-worker-2,5,main]
Leaving generateFooB
Leaving generateFooA
Final fooA A1
Final fooB B1
You can manually observe that the "Leaving" output lines appear after 1 second and 2 seconds. For more evidence you could add timestamps to the output. If you change the relative lengths of the sleeps, you'll see the "Leaving" output appear in a different order.
If you omit the sleep()
s, then it's entirely likely that the first thread will complete so quickly that it's finished before the second starts:
Entering generateA Thread[ForkJoinPool.commonPool-worker-1,5,main]
Leaving generateA
Entering generateB Thread[ForkJoinPool.commonPool-worker-1,5,main]
Leaving generateB
Final fooA A1
Final fooB B1
Notice that it's all happened so quickly, that the thread has been returned to the pool by the time the runtime asks for the second thread. So the original thread is reused.
This might conceivably also happen for a very short sleep, although on my system a 1ms sleep was enough every time I ran it. Of course the sleep()
is a placeholder for a "real" operation that takes time to complete. If your real operation is so cheap that it finishes before the other thread starts, that's a good hint that this is a scenario in which multi-threading is not beneficial.
However if you need to ask how to prove that things are happening concurrently, I wonder why you want them to happen concurrently in the first place. If there's no "real world" observable difference between your program when it's doing these tasks concurrently, or sequentially, then why not leave it running sequentially? It's easier to reason about sequential operations; there are lots of sneaky bugs associated with concurrency.
Perhaps you're hoping for a speed increase by multi-threading -- if so the increase in speed should be what you're measuring, not whether or not things are actually concurrent. And bear in mind that for an awful lot of tasks, a CPU can't perform them faster in parallel than in sequence.
Upvotes: 19
Reputation: 3187
As I said in my comment check out How to start two threads at "exactly" the same time but this should be what your looking for
final CyclicBarrier gate = new CyclicBarrier(3);
public void method(int id) {
Thread one = new Thread (()->{
gate.await();
List<FooA> fooAs = generateFooA(id);
});
Thread two = new Thread (()->{
gate.await();
List<FooB> fooBs = generateFooB(id);
});
one.start();
two.start();
gate.await();
//Do more processesing
}
public List<FooA> generateFooA(int id) {
//code
}
public List<FooB> generateFooB(int id) {
//code
}
Upvotes: -1
Reputation: 425238
You're missing an Executor
:
ExecutorService executor = Executors.newCachedThreadPool();
List<Future<?>> = Stream.<Runnable>of(() -> generateFooA(id), () -> generateFooA(id))
.map(executor::submit)
.collect(Collectors.toList());
for (Future<?> future : futures) {
future.get(); // do whatever you need here
}
The Runnables
start executing when you submit
them. The get()
returns as soon as it can. eg if the first future you get()
is the slowest, all other get()
calls will return immediately.
Upvotes: -1