Reputation: 134
I'm doing a tree search for a class assignment. I understand the tree search part, but as I have some extra time, I wanted to speed it up by adding more threads.
The final task is to take in a set of constraints, classes and time-slots, and output a schedule with all those classes, and which satisfies all the constraints. An empty or partial assignment goes in, a complete class assignment comes out.
Our search is designed like a tree, with the input being the root node. The function div(n)
is as follows: for a node n
, find an unused class C, and for each unused slot S, produce a child node with C in S. To make the search more efficient, we use a search control which ranks the quality of nodes, so that the best candidates are selected first, and we don't waste time on bad candidates
A node implements Comparable, with compareTo()
implemented using the search control. I use a priority queue to store nodes awaiting processing, so the 'best' nodes are always next in line. A worker removes an node, applies div()
and adds the children to the priority queue.
My first approach was using a shared priority queue, specifically PriorityBlockingQueue
. The performance was abysmal, since the queue was almost always blocking.
I tried to fix it by adding a background worker and a ConcurrentLinkedQueue
buffer. workers would add to the buffer, and the the worker would periodically move elements from the buffer to the priority queue. This didn't work either.
The best performance I have found is to give each worker it's own priority queue. I'm guessing that this is as good as it gets, as now threads aren't connected to the actions of others. with this config, on an 4C/8T machine, I get a speedup of ~2.5. I think the bottleneck here is the allocation of memory for all these nodes, but I could be wrong here.
From Searcher:
private PriorityQueue<Schedule> workQueue;
private static volatile boolean shutdownSignal = false;
private Schedule best;
public Searcher(List<Schedule> instances) {
workQueue = new PriorityQueue<>(instances);
}
public static void stop() {
shutdownSignal = true;
}
/**
* Run the search control starting with the first node in the workQueue
*/
@Override
public void run() {
while (!shutdownSignal) {
try {
Schedule next = workQueue.remove();
List<Schedule> children = next.div(checkBest);
workQueue.addAll(children);
} catch (Exception e) {
//TODO: handle exception
}
}
//For testing
System.out.println("Shutting down: " + workQueue.size());
}
//passing a function as a parameter
Consumer<Schedule> checkBest = new Consumer<Schedule>() {
public void accept(Schedule sched) {
if (best == null || sched.betterThan(best)) {
best = sched;
Model.checkBest.accept(sched);
}
}
};
From Schedule:
public List<Schedule> div(Consumer<Schedule> completion) {
List<Schedule> n = new ArrayList<>();
int selected = 0;
List<Slot> available = Model.getSlots();
List<Slot> allocated = getAssigned();
while (allocated.get(selected) != null) {
selected++;
} // find first available slot to fill.
// Iterate through all available slots
for (Slot t : available) {
//Prepare a fresh copy
List<Slot> newAssignment = new ArrayList<>(allocated.size());
Collections.copy(newAssignment, allocated);
//assign the course to the timeslot
newAssignment.set(selected, t);
Schedule next = new Schedule(this, newAssignment);
n.add(next);
}
/**
* Filter out nodes which violate the hard constraints and which are solved,
* and check if they are the best in a calling thread
*/
List<Schedule> unsolvedNodes = new ArrayList<>();
for (Schedule schedule: n) {
if (schedule.constr() && !schedule.solved()){
unsolvedNodes.add(schedule);
completion.accept(schedule);
}
}
return unsolvedNodes;
}
Upvotes: 2
Views: 228
Reputation: 2253
I would say that fork-join framework is an appropriate tool for your task. You need to extend your task from either ResursiveTask or ResursiveAction and submit it to ForkJoinPool. Here is a pseudo-code sample. Also your shutdown
flag must be volatile
.
public class Task extends RecursiveAction {
private final Node<Integer> node;
public Task(Node<Integer> node) {
this.node = node;
}
@Override
protected void compute() {
// check result and stop recursion if needed
List<Task> subTasks = new ArrayList<>();
List<Node<Integer>> nodes = div(this.node);
for (Node<Integer> node : nodes) {
Task task = new Task(node);
task.fork();
subTasks.add(task);
}
for(Task task : subTasks) {
task.join();
}
}
public static void main(String[] args) {
Node root = getRootNode();
new ForkJoinPool().invoke(new Task(root));
}
Upvotes: 1