Reputation: 53
I'm currently working on an enterprise application that's performing a long non-linear tasks.
An abstraction of the workflow:
Now, I have created 2 services, that can solve step 1 and 2. As the services shouldn't know of each other, I want to have a higher order Component that coordinates the 3 steps of an task. Think of it as an Callable which sends the task to service one, wakes up again when service 1 returnes an result, sends it to service 2, ..., sends final result to all post-processors and ends task. But as it is likely to have 100'000s of queued tasks, I don't want to start 100'000s threads with task-control callables which even if being idle like 99.9% of the time still would be an massive overhead.
So got anybody an idea of controling this producer consumer queue-like pattern encapsulated in a task-object or somebody knows of an framework simplifying my concern?
Upvotes: 3
Views: 1724
Reputation: 1143
Besides actor frameworks, I would suggest two main approaches that work with plain old Java:
Using an ExecutorService to which we submit tasks. The proper sequencing of steps can be synchronized using Future objects. The overall set of tasks can be synchronized using a Phaser a shown below.
Using the Fork/Join framework
Here is an example using a simple executor service. The Workflow class is given an executor and a phaser (a synchronization barrier). Each time the workflow is executed, it submits a new task for each of the steps (i.e., data collection, processing, and post-processing). Each task uses these phaser to indicate when it starts and stops.
public class Workflow {
private final ExecutorService executor;
private final Phaser phaser;
public Workflow(ExecutorService executor, Phaser phaser) {
this.executor = executor;
this.phaser = phaser;
}
public void execute(int request) throws InterruptedException, ExecutionException {
executor.submit(() -> {
phaser.register();
// Data collection
Future<Integer> input = executor.submit(() -> {
phaser.register();
System.out.println("Gathering data for call " + request);
phaser.arrive();
return request;
});
// Data Processing
Future<Integer> result = executor.submit(() -> {
phaser.register();
System.out.println("Processing call " + request);
Thread.sleep(5000);
phaser.arrive();
return request;
});
// Post processing
Future<Integer> ack = executor.submit(() -> {
phaser.register();
System.out.println("Notyfing processors for call " + request);
phaser.arrive();
return request;
});
final Integer output = ack.get();
phaser.arrive();
return output;
});
}
}
The caller object uses the phaser object to know when all subtasks (steps) have completed, before to shutdown the executor.
public static void main(String[] args) throws InterruptedException, ExecutionException {
final Phaser phaser = new Phaser();
final ExecutorService executor = Executors.newCachedThreadPool();
Workflow workflow = new Workflow(executor, phaser);
phaser.register();
for (int request=0 ; request<10 ; request++) {
workflow.execute(request);
}
phaser.arriveAndAwaitAdvance();
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
}
Upvotes: 2