chessofnerd
chessofnerd

Reputation: 1279

Maintaining FIFO when using threads in Java

I am attempting to perform an expensive and variable length task on elements received in a sequential manner. It is imperative that element order is maintained while still processing each element quickly.

Below is a SSCWE (W for wrong!) and is my attempt at parallelizing the processing. Is there a way to ensure that each call to processSomething() executes in its own thread while still maintaining FIFO when I look at the ExecutorCompletionService?

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadingHelper {

public static void main(String[] args) {

    ExecutorService pool = Executors.newFixedThreadPool(20);
    ExecutorCompletionService<String> compService 
        = new ExecutorCompletionService<String>(pool);

    // process some data
    processSomething(compService, "1.");
    processSomething(compService, "2..");
    processSomething(compService, "3...");
    processSomething(compService, "4....");
    processSomething(compService, "5.....");

    // print out the processed data
    try {
        System.out.println(compService.take().get());
        System.out.println(compService.take().get());
        System.out.println(compService.take().get());
        System.out.println(compService.take().get());
        System.out.println(compService.take().get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

}

public static synchronized void processSomething(
        CompletionService<String> compService, final String x) {
    Callable<String> c = new Callable<String>() {

        @Override
        public String call() throws Exception {
            // this represents the variable and expensive 
                            // amount of time it takes to process x
            long rand = (long) (Math.random() * 100);
            Thread.sleep(rand);

            // this represents the processing of x
            String xProcessed = x.replace(".", "!");

            return xProcessed;
        }
    };
    compService.submit(c);
}
}

Typical output is

4!!!!
2!!
1!
5!!!!!
3!!!

but I want

1!
2!!
3!!!
4!!!!
5!!!!!

Upvotes: 0

Views: 1277

Answers (1)

nosid
nosid

Reputation: 50034

Use Futures instead of the CompletionService to get the results in a particular order and still benefit from parallel execution:

ExecutorService pool = Executors.newFixedThreadPool(20);
List<Future<String>> futures = new ArrayList<>();
futures.add(pool.submit(makeCallable("1.")));
// ...
for (Future<String> future : futures) {
    try {
        System.out.println(future.get());
    } catch (...) {
        ...
    }
}

public static Callable<String> makeCallable(String x) {
    Callable<String> c = ...;
    return c;
}

Upvotes: 2

Related Questions