Reputation: 306
I've implemented a directed graph using Java. It's for a project planner and one node represents a task with different properties. I've successfully implemented topological sort, but I need a way to run/execute parallel tasks as soon dependencies for a task are completed.
Here is my implementation:
import java.util.ArrayList;
import java.util.List;
import java.util.*;
public class Task implements Comparable<Task> {
int number;
String name;
int time;
int staff;
int earliestStart, latestStart;
List<Integer> dependencies;
List<Task> outEdges;
int cntPredecessors;
Status status;
public enum Status {UNVISITED,RUNNING,VISITED};
@Override
public String toString() {
return "Task{" +
"number=" + number +
", name='" + name + '\'' +
", time=" + time +
", staff=" + staff +
", dependencies=" + dependencies +
'}';
}
public Task(int number, String name, int time, int staff) {
setNumber(number);
setName(name);
setTime(time);
setStaff(staff);
dependencies=new ArrayList<>();
outEdges=new ArrayList<>();
status = Status.UNVISITED;
}
public int getNumber() {
return number;
}
public void setNumber(int number) {
this.number = number;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getTime() {
return time;
}
public void setTime(int time) {
this.time = time;
}
public int getStaff() {
return staff;
}
public void setStaff(int staff) {
this.staff = staff;
}
public List<Integer> getDependencies() {
return dependencies;
}
public void setDependencies(List<Integer> dependencies) {
this.dependencies = dependencies;
}
public List<Task> getOutEdges() {return outEdges; }
public void setOutEdge(Task t) {outEdges.add(t); }
public int getIndegrees() { return cntPredecessors; }
public void setIndegree() { cntPredecessors = dependencies.size();}
public Status getStatus() {return this.status; }
public Task findTaskWithNoInDegree() {
if (this.cntPredecessors == 0) return this;
return null;
}
public int compareTo(Task other) {
return Integer.compare(this.time, other.time);
}
} //END class Task
// The class Main represents the Task objects in a graph
import java.util.*;
public class Main {
static int maxnr = 0;
public static void main(String[] args) {
Map<Integer, Task> map=new HashMap<>();
Scanner scanner = new Scanner(Main.class.getResourceAsStream("/res/house.txt"), "UTF-8");
Main mainObject = new Main();
map = mainObject.fromScanner(scanner);
System.out.println("DEBUG: maxnr " + maxnr);
mainObject.setInDegrees(map);
mainObject.setOutEdges(map);
//System.out.println("DEBUG: Size of outEdges for Task 1 is : " + map.get(1).getOutEdges().size());
//System.out.println("DEBUG: Indegrees for Task 8 is : " + map.get(8).getIndegrees());
mainObject.topSort(maxnr,map);
for(Integer k:map.keySet()) {
//System.out.println("DEBUG outEdges for Task number " + map.get(k).getNumber() + " " + map.get(k).getOutEdges());
}
} // END of void main(String[] args)
public void setInDegrees(Map<Integer, Task> map) {
for(Integer k:map.keySet()) {
Task task = map.get(k);
task.setIndegree();
}
}
public void setOutEdges(Map<Integer, Task> map) {
for(Integer k:map.keySet()) {
// map.get(k).setIndegrees();
for(Integer dep:map.get(k).getDependencies()) {
//System.out.println("DEBUG: "+ dep);
//System.out.print(" DEBUG: Name is " + map.get(dep).getName());
map.get(dep).setOutEdge(map.get(k));
}
//System.out.println(map.get(k));
}
} //END of setOutEdges()
// toplogical sort # Big O(|V| +|E|) for indegree calc and since the code only looks at each edge once!
// S is Set of all nodes with no incoming edges
public void topSort(int maxnr, Map<Integer, Task> map) {
ArrayList<Task> L = new ArrayList<Task>(maxnr);
//LinkedList<Task> L = new LinkedList<>();
//HashSet<Task> S = new HashSet<>(maxnr);
LinkedList<Task> S = new LinkedList<>();
for(Integer n:map.keySet()) {
if(map.get(n).getIndegrees() == 0) {
S.add(map.get(n));
}
}
System.out.println("DEBUG: Set S is " + S);
//HashSet<Task> S2 = new HashSet<>(S);
Task t;
int counter= 0;
while(!S.isEmpty()) {
//System.out.print("Topsort: Task and S. " + t.getNumber());
t = S.iterator().next();
S.remove(t);
//System.out.print("Topsort : " + t.getNumber());
L.add(t);
//System.out.println("Starting " + t.getNumber());
counter++;
for(Iterator<Task> it = t.outEdges.iterator(); it.hasNext();) {
Task w = it.next();
w.cntPredecessors--;
if (w.getIndegrees() == 0) {
S.add(w);
// System.out.println("Starting " + w.getNumber());
}
}
}
System.out.println();
if (counter < maxnr) {
System.out.println("Cycle detected, topsort not possible");
} else {
//System.out.println("Topsort : " + Arrays.toString(L.toArray()));
Iterator<Task> topsortIt = L.iterator();
System.out.print("\n Topsort list is: ");
while (topsortIt.hasNext()) {
System.out.print(" " + topsortIt.next().getNumber());
}
System.out.println();
}
} //END of topSort()
public Map fromScanner(Scanner scanner) {
Map<Integer, Task> map=new HashMap<>();
maxnr = scanner.nextInt();
while (scanner.hasNextLine()) {
String line=scanner.nextLine();
if (line.isEmpty() ) continue;
Scanner s2=new Scanner(line);
Task task = new Task(s2.nextInt(), s2.next(), s2.nextInt(), s2.nextInt());
while (s2.hasNextInt()) {
int i = s2.nextInt();
if (i != 0) {
task.getDependencies().add(i);
}
}
map.put(task.getNumber(), task);
}
return map;
} //END of fromScanner()
} //END of class Main
Content of house.txt: First line (number) is the max nodes/Tasks. The columns are: Task number, name, time required to complete, manpower requirements, dependency edges (terminated by 0).
8
1 Build-walls 4 2 5 0
2 Build-roofs 6 4 1 0
3 Put-on-wallpapers 1 2 1 2 0
4 Put-on-tiles 1 3 2 0
5 Build-foundation 4 2 0
6 Make-floor 2 2 5 0
7 Put-carpet-floor 4 2 6 2 0
8 Move-in 4 4 3 7 0
Tasks with no incoming edges (i.e no dependencies) should be started first. The execution of Tasks should be printed for instance like for the give input above:
Time: 0 Starting: 5 // Task 5 only one with no dependencies
Current staff: 2
Time: 4 Finished: 5
Starting: 6
Starting: 1
Current staff: 4 // sum of manpower from Task 6 and 1 => 2 + 2 = 4
Time: 6 Finished: 6
Current staff: 2
Time: 8 Finished: 1
Finished: 1
Starting: 2
Starting: 3
Current staff: 6
etc.
Upvotes: 5
Views: 2674
Reputation: 11
There are several methods.
You can use Java 8 CompletableFuture
or Guava ListenableFuture
to implement it.
Your task should implement Runnable
or Callable
. when you execute current task, you should execute its preTasks
recursively (if task is composite mode, it will know it's predecessor) and make sure they are done(through flag to express status or something).
Like so:
CompletableFuture.allOf(predecessor).thenRunAsync(current)...
Futures.allAsList(predecessor)...
Upvotes: 1
Reputation: 957
If it helps anyone, this is exactly the use case of JavaRed Library - scheduling and running async tasks in a flow defined as a graph.
In short, a graph flow as complex as this one:
can be simply implemented with:
Result<String> aResult = produceFutureOf(String.class).byExecuting(() -> executeA());
Result<String> bResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(a -> executeB(a));
Result<String> cResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(a -> executeC(a));
Result<String> eResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(a -> executeE(a));
Result<String> dResult = ifResults(bResult, cResult).succeed().produceFutureOf(String.class).byExecuting((b, c) -> executeD(b, c));
Result<String> fResult = ifResult(eResult).succeed().produceFutureOf(String.class).byExecuting(e -> executeF(e));
Result<String> gResult = ifResult(fResult).succeed().produceFutureOf(String.class).byExecuting(f -> executeG(f));
return ifResults(dResult, gResult).succeed().produceFutureOf(String.class).byExecuting((d, g) -> executeH(d, g));
More about it in the project wiki
Upvotes: 2
Reputation: 357
To schedule your tasks you can either maintain a thread pool by yourself, or get a library to do it for you. If you maintain thread pool yourself, it's very simple (forgive the poor pseudocode):
assume you already have tasks topoligically sorted : t0,t1,t2,....,tn
int next_task=0; //global pointer to next task
then in each thread, you do:
while (true) {
atomic {
if (next_task > n) break;
t = get_task (next_task);
next_task = next_task + 1;
}
run task t;
}
This way you get all tasks executed in an order that respects their dependencies, and each thread will jump onto the next task as soon as it finishes the last one.
If you want to have a library to schedule it for you, maybe OpenMP, TBB, or look at this thread How to Implement a DAG-like Scheduler in Java??
Hope this helps.
Upvotes: 0