Reputation: 1604
I am trying to code a problem in Java where I have to execute a bunch of tasks.
Problem
Execute a job which consists of multiple tasks and those tasks have dependencies among them.
A job will have a list of tasks and each such task will further have a list of successor tasks (Each successor task will have its own successor tasks - you can see the recursive nature here). Each successor task can start its execution if -
It is configured to be executed on partial execution of its predecessor task. In this case, predecessor task will notify that it has completed partially and my successor tasks can start
Successful completion of its predecessor task.
Example
Job having 2 initial tasks A and B. A has 2 successor tasks M and N. B has 1 successor task P. P has 2 successor tasks Y and Z.
M can start on partial completion of its predecessor task A. Z can start on partial completion of its predecessor task P. N, P and Y can start only on completion of their predecessor tasks A, B and P respectively.
I have to design the execution of such a workflow/job. In the design we have to acknowledge the partial completion event sent by a predecessor task so that its successor task can be started. How should I go about it? Is there any design pattern which suits this problem in concurrency?
Upvotes: 27
Views: 6953
Reputation: 2953
There is a framework specifically dedicated for this purpose, called Dexecutor, with Dexecutor, you model your requirements in terms of graph, when in comes to execution, dexecutor would take care of it executing it in reliable way.
For example :
@Test
public void testDependentTaskExecution() {
ExecutorService executorService = newExecutor();
ExecutionEngine<Integer, Integer> executionEngine = new DefaultExecutionEngine<>(executorService);
try {
DefaultDependentTasksExecutor<Integer, Integer> executor = new DefaultDependentTasksExecutor<Integer, Integer>(
executionEngine, new SleepyTaskProvider());
executor.addDependency(1, 2);
executor.addDependency(1, 2);
executor.addDependency(1, 3);
executor.addDependency(3, 4);
executor.addDependency(3, 5);
executor.addDependency(3, 6);
executor.addDependency(2, 7);
executor.addDependency(2, 9);
executor.addDependency(2, 8);
executor.addDependency(9, 10);
executor.addDependency(12, 13);
executor.addDependency(13, 4);
executor.addDependency(13, 14);
executor.addIndependent(11);
executor.execute(ExecutionConfig.NON_TERMINATING);
Collection<Node<Integer, Integer>> processedNodesOrder = Deencapsulation.getField(executor, "processedNodes");
assertThat(processedNodesOrder).containsAll(executionOrderExpectedResult());
assertThat(processedNodesOrder).size().isEqualTo(14);
} finally {
try {
executorService.shutdownNow();
executorService.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
}
}
private Collection<Node<Integer, Integer>> executionOrderExpectedResult() {
List<Node<Integer, Integer>> result = new ArrayList<Node<Integer, Integer>>();
result.add(new Node<Integer, Integer>(1));
result.add(new Node<Integer, Integer>(2));
result.add(new Node<Integer, Integer>(7));
result.add(new Node<Integer, Integer>(9));
result.add(new Node<Integer, Integer>(10));
result.add(new Node<Integer, Integer>(8));
result.add(new Node<Integer, Integer>(11));
result.add(new Node<Integer, Integer>(12));
result.add(new Node<Integer, Integer>(3));
result.add(new Node<Integer, Integer>(13));
result.add(new Node<Integer, Integer>(5));
result.add(new Node<Integer, Integer>(6));
result.add(new Node<Integer, Integer>(4));
result.add(new Node<Integer, Integer>(14));
return result;
}
private ExecutorService newExecutor() {
return Executors.newFixedThreadPool(ThreadPoolUtil.ioIntesivePoolSize());
}
private static class SleepyTaskProvider implements TaskProvider<Integer, Integer> {
public Task<Integer, Integer> provideTask(final Integer id) {
return new Task<Integer, Integer>() {
private static final long serialVersionUID = 1L;
public Integer execute() {
if (id == 2) {
throw new IllegalArgumentException("Invalid task");
}
return id;
}
};
}
}
Here is the modeled graph
This means tasks # 1 , 12 and 11 would run in parallel, once one of them finishes its dependent tasks would kick off, for example once task#1 finishes, its dependent tasks #2 and #3 would kick off
Upvotes: 1
Reputation: 790
Abstracting from the multithreading
class TaskA{
SimpleTask Y;
SimpleTask Z;
SimpleTask PJ;
SimpleTask RJ;
Run(){
// do the partial job
PJ.Run();
Y.Run();
// do the remaining job
RJ.Run();
Z.Run();
// return;
}
}
class TaskB{
TaskA P;
SimpleTask J;
Run(){
// do the job
J.Run();
P.Run();
// return;
}
}
Upvotes: 0
Reputation: 9816
This question is very loaded. I can see at least three different subsystems in your design:
Your description is very high level so you could start designing the abstractions you need for these three pieces and how they interact with each other (in terms of interfaces).
Once you have all of that, you can start providing some simple implementations. I would start from a "local mode", using a simple in memory DAG, a blocking queue and some type of java executor.
Your question does not provide details on SLA, length of jobs, failure/retry policies, transactions, etc., so it's hard to say how your modules should be implemented. But I suggest to think in terms of high level abstractions and iterate on the implementations. Great code will never fix bad design.
You could stop there, or start replacing each implementation with third party products if you need to.
Upvotes: 1
Reputation: 2143
Your question is interesting because someone can simulate a simple neural network using this design. As far as the answer goes, I would like to view the problem as tasks ordering rather than a multithreading/concurrency problem because concurrency can be achieved simply by executing ordered tasks. Now lets try to use event driven programming to achieve that because it allows nice loosely coupled components. So, now our design in reactive in nature, so we will worry about signaling the dependent tasks once we are done, lets use observer pattern here .
Your tasks are both observable and observer as well because they wait on notification from predecessor and notify successor, which gives us the following construct.
// Task.java
public abstract class Task extends Observable implements Runnable, Observer {
private final Mutex lock = new Mutex();
private final String taskId;
public String getTaskId() {
return this.taskId;
}
private final Set<String> completedTasks;
private final Set<String> shouldCompletedTasksBeforeStart;
public Task(final String taskId) {
this.taskId = taskId;
this.completedTasks = new HashSet<>();
this.shouldCompletedTasksBeforeStart = new HashSet<>();
}
@Override
public void run() {
while (true) {
this.lock.getLock();
if (this.completedTasks.equals(this.shouldCompletedTasksBeforeStart)) {
doWork();
setChanged();
notifyObservers(this.taskId);
// reset
this.completedTasks.clear();
}
this.lock.freeLock();
try {
// just some sleep, you change to how it fits you
Thread.sleep(1000);
} catch (final InterruptedException e) {
// TODO Auto-generated catch block
}
}
}
@Override
public void update(final Observable observable, final Object arg) {
this.lock.getLock();
this.completedTasks.add((String) arg);
this.lock.freeLock();
}
public void addPredecessorTask(final Task task) {
if (this.taskId.equals(task.taskId)) {
return;
}
this.lock.getLock();
// Notice here, it is a little logic make your predecessor/successor work
task.addObserver(this);
this.shouldCompletedTasksBeforeStart.add(task.taskId);
this.lock.freeLock();
}
protected abstract void doWork();
}
//HelloTask.java
public static class HelloTask extends Task {
public HelloTask(final String taskId) {
super(taskId);
}
@Override
protected void doWork() {
System.out.println("Hello from " + getTaskId() + "!");
}
}
//Main.java
public class Main {
public static void main(final String[] args) {
final HelloTask helloTaskA = new HelloTask("A");
final HelloTask helloTaskB = new HelloTask("B");
final HelloTask helloTaskC = new HelloTask("C");
helloTaskA.addPredecessorTask(helloTaskB);
helloTaskC.addPredecessorTask(helloTaskB);
final ExecutorService pool = Executors.newFixedThreadPool(10);
pool.execute(helloTaskC);
pool.execute(helloTaskA);
pool.execute(helloTaskB);
}
}
The implementation is very basic one and you improve upon it but it provides you the foundation structure. It would be interesting to know where you are applying this ?
Upvotes: 4
Reputation: 1097
Your problem seems to be a modified version of the Observer Pattern. The solution below is more general than as it allows dependency on a list of tasks to proceed.
Create a class Task as follows:
class Task{
List<Task> partialCompletionSuccessors;//List of tasks dependent on the partial completeion of this Task
List<Task> fullCompletetionSuccessors;//List of tasks dependent on the full completeion of this Task
List<Task> partialCompletionPredeccessor;//List of tasks that this task depends on their partial completion to start
List<Task> fullCompletetionPredeccessor;//List of tasks that this task depends on their full completion to start
private void notifySuccessorsOfPartialCompletion(){
for(Task task: partialCompletionSuccessors){
task.notifyOfPartialCompletion(this);
}
}
private void notifySuccessorsOfFullCompletion(){
for(Task task: fullCompletetionSuccessors){
task.notifyOfPartialCompletion(this);
}
}
private tryToProcceed(){
if(partialCompletionPredeccessor.size() == 0 && fullCompletetionPredeccessor.size()==0 ){
//Start the following task...
....
//When this task partially completes
notifySuccessorsOfPartialCompletion();
//When this task fully completes
notifySuccessorsOfFullCompletion();
}
}
public void notifyOfPartialCompletion(Task task){// A method to notify the following task that a predeccessor task has partially completed
partialCompletionPredeccessor.remove(task);
tryToProcceed();
}
public void notifyOfFullCompletion(Task task){// A method to notify the following task that a predeccessor task has partially completed
fullCompletetionPredeccessor.remove(task);
tryToProcceed();
}
}
Upvotes: 3
Reputation: 5265
Your problem looks like a good use case for Java's ForkJoin Framework. You could implement your tasks as RecursiveAction
s or RecursiveTask
s (depending on whether you need a return value or not) which will start their sub tasks on whatever condition you need. You'll also be able to control if your sub tasks run sequentially or in parallel.
Example:
public class TaskA extends RecursiveAction {
// ...
protected void compute() {
if (conditionForTaskM) {
TaskM m = new TaskM();
// Run task M asynchronously or use m.invoke() to run it synchronously.
invokeAll(m);
}
// Run task N at the end of A
invokeAll(new TaskN());
}
}
You need an instance of the ForkJoinPool to run your tasks:
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
pool.submit(new TaskA());
// Properly shutdown your pool...
}
This example is quite simplistic in implementing a part of your example problem. But generally speaking, the ForkJoin Framework allows you to create tree-like structures of tasks where every parent task (such as A, B and P) allows you to control the execution of its immediate child tasks.
Upvotes: 6
Reputation: 26723
If you want to reinvent the wheel and develop the solution yourself, fine - it's your choice. However doing this properly is rather hard, especially the threading part. However, if you can consider some outside help with at least the building blocks, those can be:
ListenableFuture
- using this library you can create Callable
s and feed them to special thread pool executor, which then allows custom callbacks on ListenableFuture
completion.Observable
which allows mixing and matching different tasks. This uses non-imperative coding style so beware!Upvotes: 3
Reputation: 585
2 options in terms of pattern usage, but in essence they are pretty similar. In either cases, cyclic dependencies situation need to be handled as a error in task dependency configuration. e.g. A -> B -> A
After each task[i] finishes, it will notify the Mediator, the the Mediator will notify all successor tasks of task[i]. The graph of task dependencies will be read when the execution engine starts as the data structure for Mediator to use.
The graph of task dependencies will be read as the engine starts, and each task will subscribe to MessageBus of its predecessor task's (task[i]) message of completion on topic[i]
When each task[i] finishes, it will send completion message to the MessageBus in topic[i]. Each task who subscribes to topic[i] will get notified and starts to work.
Upvotes: 4
Reputation: 9913
Take a look at akka - http://akka.io
using akka you create actors (event driven, concurrent entities that process messages asynchronously)
each task can be represented as an actor (you choose when to fire it up)
you can trigger other actors (tasks) on partial complete or full complete (actually you can trigger them whenever you want)
Upvotes: 7
Reputation: 562
If I understood well your need you may use a workflow engine like Activity to solve it. I think it would be easier than re-inventing a workflow engine for your specific need.
Upvotes: 3