Sam Fisher
Sam Fisher

Reputation: 848

How to execute threads with ExecutorService in concrete order?

In my program I have several commands, which are performed by commands executor class. I need to execute 4 command one by one in order(not to show users before I create a new one), using ExecutorService.

Execution environment:

public class ConcurrentCommandExecutionEnvironment {
    private static final int POOL_SIZE = 4;

    private static final Logger log = Logger.getLogger(ConcurrentCommandExecutionEnvironment.class);

    public void readArgsAndExecuteCommand(String[] props) {
        if (props.length == 0) {
            throw new IllegalArgumentException("Error: no params entered");
        }

        ExecutorService execService = new ThreadPoolExecutor(
                4,
                4,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()
        );
        ReentrantLock lock = new ReentrantLock();

        CommandStore commandStore = new CommandStore();
        CommandExecutor commandExecutor = new CommandExecutor(
                new CreateUserCommand(commandStore),
                new GetUserListCommand(commandStore),
                new CreateTaskCommand(commandStore),
                new GetTasksListCommand(commandStore),
                new GetTaskByUsernameCommand(commandStore),
                new CompleteTaskCommand(commandStore),
                new DeleteUserCommand(commandStore),
                new CreateUserAndTaskCommand(commandStore)
        );

        execService.execute(() -> {                
                commandExecutor.createUserAndTask(props);           
        });

        execService.execute(() -> {               
                commandExecutor.getUsers(props);
        });

        execService.execute(() -> {                
                commandExecutor.getTasks(props);           
        });

        execService.shutdown();
}

Previously I didn't work with ExecutorService and synchronized threads using "synchronized" operator. Can I use it here like this(to use commandExecutor instance as mutex and to synchronize on it in every thread like in example below):

execService.execute(() -> {
            synchronized (commandExecutor) {
                commandExecutor.createUserAndTask(props);
      }
  });

Or with ExecutorService I should proceed in another way?

Upvotes: 0

Views: 681

Answers (2)

AnatolyG
AnatolyG

Reputation: 1587

If you aren't interested in the natural sequential execution with the Executors.newSingleThreadExecutor() or even without the ExecutorService at all :)... Then let's look at the simplified illustration what the ExecutorService is (Executors.newFixedThreadPool(4)):

final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();

final int nThreads = 4;
for (int i = 0; i < nThreads; i++) {
    new Thread(() -> {
        while (true) {
            try {
                tasks.take().run();
            } catch (final InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }
    }).start();
}

tasks.offer(() -> System.out.println("Task 1"));
tasks.offer(() -> System.out.println("Task 2"));
tasks.offer(() -> System.out.println("Task 3"));

As you can see, there is no out-of-the-box tooling for inter-task/thread synchronization/communication. We cannot manage threads directly, but we can manage our tasks and this will affect the thread execution. For instance:

  1. Submit the next task after the previous one is finished. This is a quite natural solution and sometimes it is very useful (especially for scheduled execution). You submit/schedule a task until a condition is true. Do your logic, check the condition, submit/schedule the next task if the condition is true. This naturally guarantees that you have only one task running at a time (with any type of Executor). Here is an example of how it could be done:
    public class CommandChain implements Runnable {
        public static CommandChain start(final ExecutorService executor, final Runnable command) {
            return new CommandChain(executor, command);
        }

        private final ExecutorService executor;
        private final Runnable command;
        private CommandChain then;

        private CommandChain(final ExecutorService executor, final Runnable command) {
            this.executor = executor;
            this.command = command;
        }

        public CommandChain then(final Runnable command) {
            then = new CommandChain(this.executor, command);
            return then;
        }

        @Override
        public void run() {
            command.run();
            if (then != null) {
                executor.submit(then);
            }
        }
    }

    final ExecutorService executor = Executors.newFixedThreadPool(4);

    final CommandChain command1 = CommandChain.start(executor, () -> System.out.println("Command 1"));
    command1.then(() -> System.out.println("Command 2"))
            .then(() -> System.out.println("Command 3"));
    executor.submit(command1);

This trick works with any type of ExecutorService, including pooled with any pool size or just Executors.newSingleThreadExecutor()

  1. In a task wait until the previous one is finished and notify the next task when this particular one is done. One of the most convenient ways for such 1-to-1 waiting/notification is using the CountDownLatch:
    public class CommandChain implements Runnable {
        public static CommandChain start(final Runnable command) {
            return new CommandChain(null, command);
        }

        private final CountDownLatch waitLatch;
        private final Runnable command;

        private CommandChain then;

        private CommandChain(final CountDownLatch waitLatch, final Runnable command) {
            this.waitLatch = waitLatch;
            this.command = command;
        }

        public CommandChain then(final Runnable command) {
            then = new CommandChain(new CountDownLatch(1), command);
            return then;
        }

        @Override
        public void run() {
            if (waitLatch != null) {
                try {
                    waitLatch.await();
                } catch (final InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return; // we are here, because 
                    // ExecutorService is stopping with
                    // interruption of its workers, so
                    // let's finish the execution
                }
            }
            command.run();
            if (then != null) {
                then.waitLatch.countDown();
            }
        }
    }

    final ExecutorService executor = Executors.newFixedThreadPool(4);

    final CommandChain command1 = CommandChain.start(() -> System.out.println("Command 1"));
    final CommandChain command2 = command1.then(() -> System.out.println("Command 2"));
    final CommandChain command3 = command2.then(() -> System.out.println("Command 3"));

    // the order doesn't matter
    executor.submit(command3);
    executor.submit(command1);
    executor.submit(command2);

The disadvantage of this solution is that you should have enough threads to execute all tasks in the worst case of blocking/awaiting - from end to start of the chain - when the last command calls waitLatch.await() first, then the command before the last one calls its await and so on... This means n(umber)Threads >= n(umber)Tasks. Otherwise, the execution can stop, just try Executors.newFixedThreadPool(1) for this example.

We also can manage the order of task/command execution outside (in the main thread, for example) with the same shared CountDownLatches or CyclicBarriers, Phaser etc.

Upvotes: 1

Basil Bourque
Basil Bourque

Reputation: 338181

Executors.newSingleThreadExecutor

If you need multiple tasks to be run sequentially, submit them to a single-threaded executor service.

ExecutorService es = Executors.newSingleThreadExecutor() ;
…
es.submit( task1 ) ;
es.submit( task2 ) ;
es.submit( task3 ) ;

Or reconsider if you even need the executor service at all. If the original thread waits for the series of tasks to run sequentially in a single-threaded executor service, the original thread could just as well run the tasks itself. If waiting on a single thread, there is no point to being threaded.

Upvotes: 5

Related Questions