Reputation: 397
I have two arrays abc[100]
and def[1000]
and I have to find an array xyz[100]
, where xyz[i] = minDistance(abc[i],def)
i.e for every element in abc
i have to find a corresponding nearest element in def
and set in xyz.
For this I am using threads at two level . At first level I am creating threads for every 10 points in abc
and at second level for each I am creating child threads for every 100 points in def.
Below is my implementation .
My questions are
How Can I wait for the child threads of abc
(i.e def
threads) . I have gone through the java join method but not able to figure out on how to use this .
Can i use Cyclic Barrier in this case.
The actual data is in the magnitude 1000s for abc
and 10000 for def
and I haven't used threads before ,so are there any issues that can happen with this implementation . Also I have seen use of ThreadPoolExecutor
instead of the FixedThreads
in some examples but couldnt figure out how much ThreadPoolExecutor
will have.
1. DistanceCalculation
public class MinDistanceCalculation {
public static List<double[]> xyz = new Vector<double[]>();
public void method1(){
double[][] abc = new double[100][7];
double[][] def = new double[1000][7];
ExecutorService executorService = Executors.newFixedThreadPool(10);
for(int i = 0 ; i < abc.length ; i = i*10){
executorService.execute(new MainThread(abc,i , i*10 , def));
}
}
}
2 . Main Thread / abc Threads
public class MainThread implements Runnable{
double[][] abc = null;
double[][] def = null;
int startPos = 0;
int endPos = 0;
public MainThread(double[][] abc , int startPos , int endPos, double[][] def){
this.abc = abc;
this.def = def;
}
@Override
public void run() {
for(int i = startPos ; i < endPos ; i++){
ExecutorService executorService = Executors.newFixedThreadPool(10);
List<Future<double[]>> minDistancePoints = new ArrayList<Future<double[]>>();
for(int j = 0 ; j < def.length ; j = j*100 ){
Future<double[]> minDistancePoint = null;
minDistancePoint = executorService.submit(new ChildThread(abc[i], def, j, j*100));
minDistancePoints.add(minDistancePoint);
}
// How can I wait for all the threads and calculate the MinDistance and
//add it to the actual array
findMinDistanceOfAll(abc[i],minDistancePoints);
executorService.shutdown();
}
}
public void findMinDistanceOfAll(double[] mainPoint, List<Future<double[]>> distancePoints){
// Here I will find the min among the given points and add it actual array.
MinDistanceCalculation.xyz.add(null);
}
}
Child Thread / def threads
public class ChildThread implements Callable<double[]> {
double[] abc = null;
double[][] def = null;
int from;
int to;
public ChildThread(double[] abc, double[][] def, int from, int to) {
this.def = def;
this.abc = abc;
this.from = from;
this.to = to;
}
@Override
public double[] call() throws Exception {
double minDistance = Double.MAX_VALUE;
double currentDistance = 0;
double[] minCandidate = null;
for (int i = from; i < to; i++) {
currentDistance = distance(abc,def[i]);
if (currentDistance < minDistance) {
minDistance = currentDistance;
minCandidate = def[i];
}
}
return minCandidate;
}
public double distance(double[] point1 , double[] point2) {
// Calculates and Returns Euclidean distance
return 0;
}
}
Upvotes: 0
Views: 2487
Reputation: 13535
Determine what a parallel task should do. Best parallelization is when there is minimal interaction. So calculating one element of xyz array is best candidate. Splitting def in 10 chunks is bad because that chunks are not independent. Combining 10 elements of abc in one thread may have sense when we want to increase the size of a task and so reduce task's interaction, but this is a not evident optimization and should be done later.
Decide how to run these tasks. Wrapping each task in a separate Runnable and submitting to a thread pool is an universal way but here we can avoid this. Each tack is identified by the index into abc array (and xyz array). We can keep current index in an AtomicInteger and use getAndIncrement to obtain next index.
Since this task is CPU-bound, start N threads where N=number of available processors.
Count the number of finished tasks with CountDownLatch.
Add some initialization and min distance calculation here:
public class MinDistanceCalculation implements Runnable {
AtomicInteger idx=new AtomicInteger();
int inpSize=100;
double[] abc = new double[inpSize];
double[] def = ...
double[] xyz = new double[inpSize];
CountDownLatch counter=new CountDownLatch(inpSize);
public void run() {
for (;;) {
int nextIndex=idx.getAndIncrement();
if (nextIndex>=inpSize) return;
xyz[nextIndex]=minDistance(abc[nextIndex], def);
counter.countDown();
}
void start() {
for (int k=0; k<Runtime.getRuntime.availableProcessors()) {
new Thread(this).start();
}
counter.await();
}
public static void main(String[] a) {
new MinDistanceCalculation().start();
}
}
Upvotes: 1