Reputation: 1279
I am attempting to perform an expensive and variable length task on elements received in a sequential manner. It is imperative that element order is maintained while still processing each element quickly.
Below is a SSCWE (W for wrong!) and is my attempt at parallelizing the processing. Is there a way to ensure that each call to processSomething()
executes in its own thread while still maintaining FIFO when I look at the ExecutorCompletionService
?
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadingHelper {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(20);
ExecutorCompletionService<String> compService
= new ExecutorCompletionService<String>(pool);
// process some data
processSomething(compService, "1.");
processSomething(compService, "2..");
processSomething(compService, "3...");
processSomething(compService, "4....");
processSomething(compService, "5.....");
// print out the processed data
try {
System.out.println(compService.take().get());
System.out.println(compService.take().get());
System.out.println(compService.take().get());
System.out.println(compService.take().get());
System.out.println(compService.take().get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
public static synchronized void processSomething(
CompletionService<String> compService, final String x) {
Callable<String> c = new Callable<String>() {
@Override
public String call() throws Exception {
// this represents the variable and expensive
// amount of time it takes to process x
long rand = (long) (Math.random() * 100);
Thread.sleep(rand);
// this represents the processing of x
String xProcessed = x.replace(".", "!");
return xProcessed;
}
};
compService.submit(c);
}
}
Typical output is
4!!!!
2!!
1!
5!!!!!
3!!!
but I want
1!
2!!
3!!!
4!!!!
5!!!!!
Upvotes: 0
Views: 1277
Reputation: 50034
Use Futures instead of the CompletionService to get the results in a particular order and still benefit from parallel execution:
ExecutorService pool = Executors.newFixedThreadPool(20);
List<Future<String>> futures = new ArrayList<>();
futures.add(pool.submit(makeCallable("1.")));
// ...
for (Future<String> future : futures) {
try {
System.out.println(future.get());
} catch (...) {
...
}
}
public static Callable<String> makeCallable(String x) {
Callable<String> c = ...;
return c;
}
Upvotes: 2