user1522820
user1522820

Reputation: 1604

How to design an execution engine for a sequence of tasks

I am trying to code a problem in Java where I have to execute a bunch of tasks.

Problem

Execute a job which consists of multiple tasks and those tasks have dependencies among them.

A job will have a list of tasks and each such task will further have a list of successor tasks (Each successor task will have its own successor tasks - you can see the recursive nature here). Each successor task can start its execution if -

  1. It is configured to be executed on partial execution of its predecessor task. In this case, predecessor task will notify that it has completed partially and my successor tasks can start

  2. Successful completion of its predecessor task.

Example

Job having 2 initial tasks A and B. A has 2 successor tasks M and N. B has 1 successor task P. P has 2 successor tasks Y and Z.

M can start on partial completion of its predecessor task A. Z can start on partial completion of its predecessor task P. N, P and Y can start only on completion of their predecessor tasks A, B and P respectively.

Tasks Hierarchy (A and B can start in parallel)

I have to design the execution of such a workflow/job. In the design we have to acknowledge the partial completion event sent by a predecessor task so that its successor task can be started. How should I go about it? Is there any design pattern which suits this problem in concurrency?

Upvotes: 27

Views: 6953

Answers (10)

craftsmannadeem
craftsmannadeem

Reputation: 2953

There is a framework specifically dedicated for this purpose, called Dexecutor, with Dexecutor, you model your requirements in terms of graph, when in comes to execution, dexecutor would take care of it executing it in reliable way.

For example :

@Test
public void testDependentTaskExecution() {

    ExecutorService executorService = newExecutor();
    ExecutionEngine<Integer, Integer> executionEngine = new DefaultExecutionEngine<>(executorService);

    try {
        DefaultDependentTasksExecutor<Integer, Integer> executor = new DefaultDependentTasksExecutor<Integer, Integer>(
                executionEngine, new SleepyTaskProvider());

        executor.addDependency(1, 2);
        executor.addDependency(1, 2);
        executor.addDependency(1, 3);
        executor.addDependency(3, 4);
        executor.addDependency(3, 5);
        executor.addDependency(3, 6);
        executor.addDependency(2, 7);
        executor.addDependency(2, 9);
        executor.addDependency(2, 8);
        executor.addDependency(9, 10);
        executor.addDependency(12, 13);
        executor.addDependency(13, 4);
        executor.addDependency(13, 14);
        executor.addIndependent(11);

        executor.execute(ExecutionConfig.NON_TERMINATING);

        Collection<Node<Integer, Integer>> processedNodesOrder = Deencapsulation.getField(executor, "processedNodes");
        assertThat(processedNodesOrder).containsAll(executionOrderExpectedResult());
        assertThat(processedNodesOrder).size().isEqualTo(14);

    } finally {
        try {
            executorService.shutdownNow();
            executorService.awaitTermination(1, TimeUnit.SECONDS);
        } catch (InterruptedException e) {

        }
    }
}

private Collection<Node<Integer, Integer>> executionOrderExpectedResult() {
    List<Node<Integer, Integer>> result = new ArrayList<Node<Integer, Integer>>();
    result.add(new Node<Integer, Integer>(1));
    result.add(new Node<Integer, Integer>(2));
    result.add(new Node<Integer, Integer>(7));
    result.add(new Node<Integer, Integer>(9));
    result.add(new Node<Integer, Integer>(10));
    result.add(new Node<Integer, Integer>(8));
    result.add(new Node<Integer, Integer>(11));
    result.add(new Node<Integer, Integer>(12));
    result.add(new Node<Integer, Integer>(3));
    result.add(new Node<Integer, Integer>(13));
    result.add(new Node<Integer, Integer>(5));
    result.add(new Node<Integer, Integer>(6));
    result.add(new Node<Integer, Integer>(4));
    result.add(new Node<Integer, Integer>(14));
    return result;
}

private ExecutorService newExecutor() {
    return Executors.newFixedThreadPool(ThreadPoolUtil.ioIntesivePoolSize());
}

private static class SleepyTaskProvider implements TaskProvider<Integer, Integer> {

    public Task<Integer, Integer> provideTask(final Integer id) {

        return new Task<Integer, Integer>() {

            private static final long serialVersionUID = 1L;

            public Integer execute() {
                if (id == 2) {
                    throw new IllegalArgumentException("Invalid task");
                }
                return id;
            }
        };
    }
}

Here is the modeled graph

dexecutor-graph.png

This means tasks # 1 , 12 and 11 would run in parallel, once one of them finishes its dependent tasks would kick off, for example once task#1 finishes, its dependent tasks #2 and #3 would kick off

Upvotes: 1

Hovo
Hovo

Reputation: 790

Abstracting from the multithreading

class TaskA{
    SimpleTask Y;
    SimpleTask Z;

    SimpleTask PJ;
    SimpleTask RJ;

    Run(){
        // do the partial job
        PJ.Run();
        Y.Run();
        // do the remaining job
        RJ.Run();
        Z.Run();
        // return;
    }
} 

class TaskB{
    TaskA P;
    SimpleTask J;

    Run(){
        // do the job
        J.Run();
        P.Run();
        // return;
    }
}

Upvotes: 0

Giovanni Botta
Giovanni Botta

Reputation: 9816

This question is very loaded. I can see at least three different subsystems in your design:

  • task DAG (partial completion to me just means you actually have 2 nodes instead of 1), which can be persisted in a database for example;
  • work queue, where the next task to execute must be enqueued (this could be some kind of message queue, again for persistence/transactionality);
  • the actual execution framework, which might involve accessing external resources like services/databases and can potentially be distributed.

Your description is very high level so you could start designing the abstractions you need for these three pieces and how they interact with each other (in terms of interfaces).

Once you have all of that, you can start providing some simple implementations. I would start from a "local mode", using a simple in memory DAG, a blocking queue and some type of java executor.

Your question does not provide details on SLA, length of jobs, failure/retry policies, transactions, etc., so it's hard to say how your modules should be implemented. But I suggest to think in terms of high level abstractions and iterate on the implementations. Great code will never fix bad design.

You could stop there, or start replacing each implementation with third party products if you need to.

Upvotes: 1

yadab
yadab

Reputation: 2143

Your question is interesting because someone can simulate a simple neural network using this design. As far as the answer goes, I would like to view the problem as tasks ordering rather than a multithreading/concurrency problem because concurrency can be achieved simply by executing ordered tasks. Now lets try to use event driven programming to achieve that because it allows nice loosely coupled components. So, now our design in reactive in nature, so we will worry about signaling the dependent tasks once we are done, lets use observer pattern here .

Your tasks are both observable and observer as well because they wait on notification from predecessor and notify successor, which gives us the following construct.

// Task.java
public abstract class Task extends Observable implements Runnable, Observer {
    private final Mutex lock = new Mutex();
    private final String taskId;

    public String getTaskId() {
        return this.taskId;
    }

    private final Set<String> completedTasks;
    private final Set<String> shouldCompletedTasksBeforeStart;

    public Task(final String taskId) {
        this.taskId = taskId;
        this.completedTasks = new HashSet<>();
        this.shouldCompletedTasksBeforeStart = new HashSet<>();
    }

    @Override
    public void run() {
        while (true) {
            this.lock.getLock();
            if (this.completedTasks.equals(this.shouldCompletedTasksBeforeStart)) {
                doWork();
                setChanged();
                notifyObservers(this.taskId);
                // reset
                this.completedTasks.clear();
            }
            this.lock.freeLock();
            try {
                // just some sleep, you change to how it fits you
                Thread.sleep(1000);
            } catch (final InterruptedException e) {
                // TODO Auto-generated catch block
            }
        }
    }

    @Override
    public void update(final Observable observable, final Object arg) {
        this.lock.getLock();
        this.completedTasks.add((String) arg);
        this.lock.freeLock();
    }

    public void addPredecessorTask(final Task task) {
        if (this.taskId.equals(task.taskId)) {
            return;
        }
        this.lock.getLock();
        // Notice here, it is a little logic make your predecessor/successor work
        task.addObserver(this);
        this.shouldCompletedTasksBeforeStart.add(task.taskId);
        this.lock.freeLock();
    }

    protected abstract void doWork();

}

//HelloTask.java
public static class HelloTask extends Task {
    public HelloTask(final String taskId) {
        super(taskId);
    }

    @Override
    protected void doWork() {
        System.out.println("Hello from " + getTaskId() + "!");
    }
}

//Main.java
public class Main {
    public static void main(final String[] args) {
        final HelloTask helloTaskA = new HelloTask("A");
        final HelloTask helloTaskB = new HelloTask("B");
        final HelloTask helloTaskC = new HelloTask("C");

        helloTaskA.addPredecessorTask(helloTaskB);
        helloTaskC.addPredecessorTask(helloTaskB);

        final ExecutorService pool = Executors.newFixedThreadPool(10);
        pool.execute(helloTaskC);
        pool.execute(helloTaskA);
        pool.execute(helloTaskB);

    }
}

The implementation is very basic one and you improve upon it but it provides you the foundation structure. It would be interesting to know where you are applying this ?

Upvotes: 4

javaHunter
javaHunter

Reputation: 1097

Your problem seems to be a modified version of the Observer Pattern. The solution below is more general than as it allows dependency on a list of tasks to proceed.

Create a class Task as follows:

class Task{
List<Task> partialCompletionSuccessors;//List of tasks dependent on the partial completeion of this Task
List<Task> fullCompletetionSuccessors;//List of tasks dependent on the full completeion of this Task

List<Task> partialCompletionPredeccessor;//List of tasks that this task depends on their partial completion to start
List<Task> fullCompletetionPredeccessor;//List of tasks that this task depends on their full completion to start


private void notifySuccessorsOfPartialCompletion(){
    for(Task task: partialCompletionSuccessors){
           task.notifyOfPartialCompletion(this);
    }

}

private void notifySuccessorsOfFullCompletion(){
    for(Task task: fullCompletetionSuccessors){
           task.notifyOfPartialCompletion(this);
    }

}

private tryToProcceed(){
 if(partialCompletionPredeccessor.size() == 0 && fullCompletetionPredeccessor.size()==0 ){
     //Start the following task...
      ....
     //When this task partially completes
    notifySuccessorsOfPartialCompletion();

      //When this task fully completes
      notifySuccessorsOfFullCompletion();

  }

}


public void notifyOfPartialCompletion(Task task){// A method to notify the following task that a predeccessor task has partially completed
      partialCompletionPredeccessor.remove(task);
      tryToProcceed();

}

public void notifyOfFullCompletion(Task task){// A method to notify the following task that a predeccessor task has partially completed
      fullCompletetionPredeccessor.remove(task);
      tryToProcceed();

}

}

Upvotes: 3

Stefan Ferstl
Stefan Ferstl

Reputation: 5265

Your problem looks like a good use case for Java's ForkJoin Framework. You could implement your tasks as RecursiveActions or RecursiveTasks (depending on whether you need a return value or not) which will start their sub tasks on whatever condition you need. You'll also be able to control if your sub tasks run sequentially or in parallel.

Example:

public class TaskA extends RecursiveAction {
  // ...

  protected void compute() {
    if (conditionForTaskM) {
      TaskM m = new TaskM();
      // Run task M asynchronously or use m.invoke() to run it synchronously.
      invokeAll(m);
    }

    // Run task N at the end of A
    invokeAll(new TaskN());
  }

}

You need an instance of the ForkJoinPool to run your tasks:

public static void main(String[] args) {
  ForkJoinPool pool = new ForkJoinPool();
  pool.submit(new TaskA());

  // Properly shutdown your pool...
}

This example is quite simplistic in implementing a part of your example problem. But generally speaking, the ForkJoin Framework allows you to create tree-like structures of tasks where every parent task (such as A, B and P) allows you to control the execution of its immediate child tasks.

Upvotes: 6

mindas
mindas

Reputation: 26723

If you want to reinvent the wheel and develop the solution yourself, fine - it's your choice. However doing this properly is rather hard, especially the threading part. However, if you can consider some outside help with at least the building blocks, those can be:

  • guava's ListenableFuture - using this library you can create Callables and feed them to special thread pool executor, which then allows custom callbacks on ListenableFuture completion.
  • you could take a look at RX Java Observable which allows mixing and matching different tasks. This uses non-imperative coding style so beware!

Upvotes: 3

Lex Lian
Lex Lian

Reputation: 585

2 options in terms of pattern usage, but in essence they are pretty similar. In either cases, cyclic dependencies situation need to be handled as a error in task dependency configuration. e.g. A -> B -> A

  1. Mediator pattern

After each task[i] finishes, it will notify the Mediator, the the Mediator will notify all successor tasks of task[i]. The graph of task dependencies will be read when the execution engine starts as the data structure for Mediator to use.

  1. Publish/Subscribe pattern via message bus.

The graph of task dependencies will be read as the engine starts, and each task will subscribe to MessageBus of its predecessor task's (task[i]) message of completion on topic[i]

When each task[i] finishes, it will send completion message to the MessageBus in topic[i]. Each task who subscribes to topic[i] will get notified and starts to work.

Upvotes: 4

Nimrod007
Nimrod007

Reputation: 9913

Take a look at akka - http://akka.io

using akka you create actors (event driven, concurrent entities that process messages asynchronously)

each task can be represented as an actor (you choose when to fire it up)

you can trigger other actors (tasks) on partial complete or full complete (actually you can trigger them whenever you want)

Upvotes: 7

Olivier Meurice
Olivier Meurice

Reputation: 562

If I understood well your need you may use a workflow engine like Activity to solve it. I think it would be easier than re-inventing a workflow engine for your specific need.

Upvotes: 3

Related Questions