Reputation: 337
I've got a little bit of work that is easily parallelizable, and I want to use Java threads to split up the work across my four core machine. It's a genetic algorithm applied to the traveling salesman problem. It doesn't sound easily parallelizable, but the first loop is very easily so. The second part where I talk about the actual evolution may or may not be, but I want to know if I'm getting slow down because of the way I'm implementing threading, or if its the algorithm itself.
Also, if anyone has better ideas on how I should be implementing what I'm trying to do, that would be very much appreciated.
In main(), I have this:
final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(numThreads*numIter);
ThreadPoolExecutor tpool = new ThreadPoolExecutor(numThreads, numThreads, 10, TimeUnit.SECONDS, queue);
barrier = new CyclicBarrier(numThreads);
k.init(tpool);
I have a loop that is done inside of init() and looks like this:
for (int i = 0; i < numCities; i++) {
x[i] = rand.nextInt(width);
y[i] = rand.nextInt(height);
}
That I changed to this:
int errorCities = 0, stepCities = 0;
stepCities = numCities/numThreads;
errorCities = numCities - stepCities*numThreads;
// Split up work, assign to threads
for (int i = 1; i <= numThreads; i++) {
int startCities = (i-1)*stepCities;
int endCities = startCities + stepCities;
// This is a bit messy...
if(i <= numThreads) endCities += errorCities;
tpool.execute(new citySetupThread(startCities, endCities));
}
And here is citySetupThread() class:
public class citySetupThread implements Runnable {
int start, end;
public citySetupThread(int s, int e) {
start = s;
end = e;
}
public void run() {
for (int j = start; j < end; j++) {
x[j] = ThreadLocalRandom.current().nextInt(0, width);
y[j] = ThreadLocalRandom.current().nextInt(0, height);
}
try {
barrier.await();
} catch (InterruptedException ie) {
return;
} catch (BrokenBarrierException bbe) {
return;
}
}
}
The above code is run once in the program, so it was sort of a test case for my threading constructs (this is my first experience with Java threads). I implemented the same sort of thing in a real critical section, specifically the evolution part of the genetic algorithm, whose class is as follows:
public class evolveThread implements Runnable {
int start, end;
public evolveThread(int s, int e) {
start = s;
end = e;
}
public void run() {
// Get midpoint
int n = population.length/2, m;
for (m = start; m > end; m--) {
int i, j;
i = ThreadLocalRandom.current().nextInt(0, n);
do {
j = ThreadLocalRandom.current().nextInt(0, n);
} while(i == j);
population[m].crossover(population[i], population[j]);
population[m].mutate(numCities);
}
try {
barrier.await();
} catch (InterruptedException ie) {
return;
} catch (BrokenBarrierException bbe) {
return;
}
}
}
Which exists in a function evolve() that is called in init() like so:
for (int p = 0; p < numIter; p++) evolve(p, tpool);
Yes I know that's not terribly good design, but for other reasons I'm stuck with it. Inside of evolve is the relevant parts, shown here:
// Threaded inner loop
int startEvolve = popSize - 1,
endEvolve = (popSize - 1) - (popSize - 1)/numThreads;
// Split up work, assign to threads
for (int i = 0; i < numThreads; i++) {
endEvolve = (popSize - 1) - (popSize - 1)*(i + 1)/numThreads + 1;
tpool.execute(new evolveThread(startEvolve, endEvolve));
startEvolve = endEvolve;
}
// Wait for our comrades
try {
barrier.await();
} catch (InterruptedException ie) {
return;
} catch (BrokenBarrierException bbe) {
return;
}
population[1].crossover(population[0], population[1]);
population[1].mutate(numCities);
population[0].mutate(numCities);
// Pick out the strongest
Arrays.sort(population, population[0]);
current = population[0];
generation++;
What I really want to know is this:
What role does the "queue" have? Am I right to create a queue for as many jobs as I think will be executed for all threads in the pool? If the size isn't sufficiently large, I get RejectedExecutionException's. I just decided to do numThreads*numIterations because that's how many jobs there would be (for the actual evolution method that I mentioned earlier). It's weird though.. I shouldn't have to do this if the barrier.await()'s were working, which leads me to...
Am I using the barrier.await() correctly? Currently I have it in two places: inside the run() method for the Runnable object, and after the for loop that executes all the jobs. I would've thought only one would be required, but I get errors if I remove one or the other.
I'm suspicious of contention for the threads, as that is the only thing I can glean from the absurd slowdown (which does scale with the input parameters). I want to know if it is anything to do with how I'm implementing the thread pool and barriers. If not, then I'll have to look inside the crossover() and mutate() methods, I suppose.
Upvotes: 5
Views: 2999
Reputation: 533570
As you increase the number of tasks, you increase the overhead using each task adds. This means you want to minimise the number of tasks i.e. the same as the number of cpus you have. For some tasks using double the number of cpus can be better when the work load is not even.
BTW: You don't need a barrier in each task, you can wait for the future of each task to complete by calling get()
on each one.
Upvotes: 1
Reputation: 1344
First, I think you may have a bug with how you intended to use the CyclicBarrier. Currently you are initializing it with the number of executor threads as the number of parties. You have an additional party, however; the main thread. So I think you need to do:
barrier = new CyclicBarrier(numThreads + 1);
I think this should work, but personally I find it an odd use of the barrier.
When using a worker-queue thread-pool model I find it easier to use a Semaphore or Java's Future model.
For a semaphore:
class MyRunnable implements Runnable {
private final Semaphore sem;
public MyRunnable(Semaphore sem) {
this.sem = sem;
}
public void run() {
// do work
// signal complete
sem.release()
}
}
Then in your main thread:
Semaphore sem = new Semaphore(0);
for (int i = 0; i < numJobs; ++i) {
threadPool.execute(new MyRunnable(sem));
}
sem.acquire(numJobs);
Its really doing the same thing as the barrier, but I find it easier to think about the worker tasks "signaling" that they are done instead of "sync'ing up" with the main thread again.
For example, if you look at the example code in the CyclicBarrier JavaDoc the call to barrier.await()
is inside the loop inside the worker. So it is really synching up the multiple long running worker threads and the main thread is not participating in the barrier. Calling barrier.await()
at the end of the worker outside the loop is more signaling completion.
Upvotes: 4