advancedstupid
advancedstupid

Reputation: 53

Workflow Design pattern combined with Task Pattern?

I'm currently working on an enterprise application that's performing a long non-linear tasks.

An abstraction of the workflow:

  1. Gather neccessary information (can take minutes, but not always necessary)
  2. Process data (always takes very long)
  3. Notify several worker who post-process the result (in new tasks)

Now, I have created 2 services, that can solve step 1 and 2. As the services shouldn't know of each other, I want to have a higher order Component that coordinates the 3 steps of an task. Think of it as an Callable which sends the task to service one, wakes up again when service 1 returnes an result, sends it to service 2, ..., sends final result to all post-processors and ends task. But as it is likely to have 100'000s of queued tasks, I don't want to start 100'000s threads with task-control callables which even if being idle like 99.9% of the time still would be an massive overhead.

So got anybody an idea of controling this producer consumer queue-like pattern encapsulated in a task-object or somebody knows of an framework simplifying my concern?

Upvotes: 3

Views: 1724

Answers (1)

fchauvel
fchauvel

Reputation: 1143

Besides actor frameworks, I would suggest two main approaches that work with plain old Java:

  • Using an ExecutorService to which we submit tasks. The proper sequencing of steps can be synchronized using Future objects. The overall set of tasks can be synchronized using a Phaser a shown below.

  • Using the Fork/Join framework

Here is an example using a simple executor service. The Workflow class is given an executor and a phaser (a synchronization barrier). Each time the workflow is executed, it submits a new task for each of the steps (i.e., data collection, processing, and post-processing). Each task uses these phaser to indicate when it starts and stops.

public class Workflow {

    private final ExecutorService executor;
    private final Phaser phaser;

    public Workflow(ExecutorService executor, Phaser phaser) {
        this.executor = executor;
        this.phaser = phaser;
    }

    public void execute(int request) throws InterruptedException, ExecutionException {
        executor.submit(() -> {
            phaser.register();
            // Data collection
            Future<Integer> input = executor.submit(() -> {
                phaser.register();
                System.out.println("Gathering data for call " + request);
                phaser.arrive();
                return request;
            });
            // Data Processing
            Future<Integer> result = executor.submit(() -> {
                phaser.register();
                System.out.println("Processing call " + request);
                Thread.sleep(5000);
                phaser.arrive();
                return request;
            });
            // Post processing
            Future<Integer> ack = executor.submit(() -> {
                phaser.register();
                System.out.println("Notyfing processors for call " + request);
                phaser.arrive();
                return request;
            });
            final Integer output = ack.get();
            phaser.arrive();
            return output;
        });
    }

}

The caller object uses the phaser object to know when all subtasks (steps) have completed, before to shutdown the executor.

public static void main(String[] args) throws InterruptedException, ExecutionException {
    final Phaser phaser = new Phaser();
    final ExecutorService executor = Executors.newCachedThreadPool();
    Workflow workflow = new Workflow(executor, phaser);

    phaser.register();
    for (int request=0 ; request<10 ; request++) {
        workflow.execute(request);
    }

    phaser.arriveAndAwaitAdvance();
    executor.shutdown();
    executor.awaitTermination(30, TimeUnit.SECONDS);    
}

Upvotes: 2

Related Questions