Reputation: 848
In my program I have several commands, which are performed by commands executor class. I need to execute 4 command one by one in order(not to show users before I create a new one), using ExecutorService.
Execution environment:
public class ConcurrentCommandExecutionEnvironment {
private static final int POOL_SIZE = 4;
private static final Logger log = Logger.getLogger(ConcurrentCommandExecutionEnvironment.class);
public void readArgsAndExecuteCommand(String[] props) {
if (props.length == 0) {
throw new IllegalArgumentException("Error: no params entered");
}
ExecutorService execService = new ThreadPoolExecutor(
4,
4,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()
);
ReentrantLock lock = new ReentrantLock();
CommandStore commandStore = new CommandStore();
CommandExecutor commandExecutor = new CommandExecutor(
new CreateUserCommand(commandStore),
new GetUserListCommand(commandStore),
new CreateTaskCommand(commandStore),
new GetTasksListCommand(commandStore),
new GetTaskByUsernameCommand(commandStore),
new CompleteTaskCommand(commandStore),
new DeleteUserCommand(commandStore),
new CreateUserAndTaskCommand(commandStore)
);
execService.execute(() -> {
commandExecutor.createUserAndTask(props);
});
execService.execute(() -> {
commandExecutor.getUsers(props);
});
execService.execute(() -> {
commandExecutor.getTasks(props);
});
execService.shutdown();
}
Previously I didn't work with ExecutorService and synchronized threads using "synchronized" operator. Can I use it here like this(to use commandExecutor instance as mutex and to synchronize on it in every thread like in example below):
execService.execute(() -> {
synchronized (commandExecutor) {
commandExecutor.createUserAndTask(props);
}
});
Or with ExecutorService I should proceed in another way?
Upvotes: 0
Views: 681
Reputation: 1587
If you aren't interested in the natural sequential execution with the Executors.newSingleThreadExecutor()
or even without the ExecutorService
at all :)... Then let's look at the simplified illustration what the ExecutorService
is (Executors.newFixedThreadPool(4)
):
final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
final int nThreads = 4;
for (int i = 0; i < nThreads; i++) {
new Thread(() -> {
while (true) {
try {
tasks.take().run();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Throwable t) {
t.printStackTrace();
}
}
}).start();
}
tasks.offer(() -> System.out.println("Task 1"));
tasks.offer(() -> System.out.println("Task 2"));
tasks.offer(() -> System.out.println("Task 3"));
As you can see, there is no out-of-the-box tooling for inter-task/thread synchronization/communication. We cannot manage threads directly, but we can manage our tasks and this will affect the thread execution. For instance:
public class CommandChain implements Runnable {
public static CommandChain start(final ExecutorService executor, final Runnable command) {
return new CommandChain(executor, command);
}
private final ExecutorService executor;
private final Runnable command;
private CommandChain then;
private CommandChain(final ExecutorService executor, final Runnable command) {
this.executor = executor;
this.command = command;
}
public CommandChain then(final Runnable command) {
then = new CommandChain(this.executor, command);
return then;
}
@Override
public void run() {
command.run();
if (then != null) {
executor.submit(then);
}
}
}
final ExecutorService executor = Executors.newFixedThreadPool(4);
final CommandChain command1 = CommandChain.start(executor, () -> System.out.println("Command 1"));
command1.then(() -> System.out.println("Command 2"))
.then(() -> System.out.println("Command 3"));
executor.submit(command1);
This trick works with any type of ExecutorService, including pooled with any pool size or just Executors.newSingleThreadExecutor()
CountDownLatch
: public class CommandChain implements Runnable {
public static CommandChain start(final Runnable command) {
return new CommandChain(null, command);
}
private final CountDownLatch waitLatch;
private final Runnable command;
private CommandChain then;
private CommandChain(final CountDownLatch waitLatch, final Runnable command) {
this.waitLatch = waitLatch;
this.command = command;
}
public CommandChain then(final Runnable command) {
then = new CommandChain(new CountDownLatch(1), command);
return then;
}
@Override
public void run() {
if (waitLatch != null) {
try {
waitLatch.await();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
return; // we are here, because
// ExecutorService is stopping with
// interruption of its workers, so
// let's finish the execution
}
}
command.run();
if (then != null) {
then.waitLatch.countDown();
}
}
}
final ExecutorService executor = Executors.newFixedThreadPool(4);
final CommandChain command1 = CommandChain.start(() -> System.out.println("Command 1"));
final CommandChain command2 = command1.then(() -> System.out.println("Command 2"));
final CommandChain command3 = command2.then(() -> System.out.println("Command 3"));
// the order doesn't matter
executor.submit(command3);
executor.submit(command1);
executor.submit(command2);
The disadvantage of this solution is that you should have enough threads to execute all tasks in the worst case of blocking/awaiting - from end to start of the chain - when the last command calls waitLatch.await()
first, then the command before the last one calls its await and so on... This means n(umber)Threads >= n(umber)Tasks. Otherwise, the execution can stop, just try Executors.newFixedThreadPool(1)
for this example.
We also can manage the order of task/command execution outside (in the main thread, for example) with the same shared CountDownLatch
es or CyclicBarrier
s, Phaser
etc.
Upvotes: 1
Reputation: 338181
Executors.newSingleThreadExecutor
If you need multiple tasks to be run sequentially, submit them to a single-threaded executor service.
ExecutorService es = Executors.newSingleThreadExecutor() ;
…
es.submit( task1 ) ;
es.submit( task2 ) ;
es.submit( task3 ) ;
Or reconsider if you even need the executor service at all. If the original thread waits for the series of tasks to run sequentially in a single-threaded executor service, the original thread could just as well run the tasks itself. If waiting on a single thread, there is no point to being threaded.
Upvotes: 5