Abhilash
Abhilash

Reputation: 397

Multiple Threads spawning multiple threads and how to wait for child Threads

I have two arrays abc[100] and def[1000] and I have to find an array xyz[100] , where xyz[i] = minDistance(abc[i],def) i.e for every element in abc i have to find a corresponding nearest element in def and set in xyz.

For this I am using threads at two level . At first level I am creating threads for every 10 points in abc and at second level for each I am creating child threads for every 100 points in def. Below is my implementation .

My questions are

1. DistanceCalculation

public class MinDistanceCalculation {   
public static List<double[]> xyz = new Vector<double[]>(); 

public void method1(){      
    double[][] abc = new double[100][7];
    double[][] def = new double[1000][7];

    ExecutorService executorService = Executors.newFixedThreadPool(10);     
    for(int i = 0 ; i < abc.length ; i = i*10){
        executorService.execute(new MainThread(abc,i , i*10 , def));
    }       
}
}

2 . Main Thread / abc Threads

public class MainThread implements Runnable{

double[][] abc = null;
double[][] def = null;
int startPos  = 0;
int endPos = 0;

public MainThread(double[][] abc , int startPos , int endPos, double[][] def){
    this.abc = abc;
    this.def = def;
}

@Override
public void run() {     

    for(int i = startPos ; i < endPos ; i++){
ExecutorService executorService = Executors.newFixedThreadPool(10);
List<Future<double[]>>  minDistancePoints = new ArrayList<Future<double[]>>();

for(int j = 0 ; j < def.length ; j = j*100 ){
  Future<double[]> minDistancePoint = null;
  minDistancePoint = executorService.submit(new ChildThread(abc[i], def, j, j*100));
  minDistancePoints.add(minDistancePoint);              
        }
 // How can I wait for all the threads and calculate the MinDistance and 
        //add it to the actual array
        findMinDistanceOfAll(abc[i],minDistancePoints);
                    executorService.shutdown();
    }       
}

public void findMinDistanceOfAll(double[] mainPoint, List<Future<double[]>>  distancePoints){
    // Here  I will find the min among the given points and add it actual array.
    MinDistanceCalculation.xyz.add(null);
}

}

Child Thread / def threads

public class ChildThread implements Callable<double[]> {

double[] abc = null;
double[][] def = null;
int from;
int to;

public ChildThread(double[] abc, double[][] def, int from, int to) {
    this.def = def;
    this.abc = abc;
    this.from = from;
    this.to = to;
}

@Override
public double[] call() throws Exception {

    double minDistance = Double.MAX_VALUE;
    double currentDistance = 0;
    double[] minCandidate = null;

    for (int i = from; i < to; i++) {
        currentDistance = distance(abc,def[i]);
        if (currentDistance < minDistance) {
            minDistance = currentDistance;
            minCandidate = def[i];
        }
    }

    return minCandidate;
}

public double distance(double[] point1 , double[] point2) {
    // Calculates and Returns Euclidean distance        
    return 0;
}
}

Upvotes: 0

Views: 2487

Answers (1)

Alexei Kaigorodov
Alexei Kaigorodov

Reputation: 13535

  • Determine what a parallel task should do. Best parallelization is when there is minimal interaction. So calculating one element of xyz array is best candidate. Splitting def in 10 chunks is bad because that chunks are not independent. Combining 10 elements of abc in one thread may have sense when we want to increase the size of a task and so reduce task's interaction, but this is a not evident optimization and should be done later.

  • Decide how to run these tasks. Wrapping each task in a separate Runnable and submitting to a thread pool is an universal way but here we can avoid this. Each tack is identified by the index into abc array (and xyz array). We can keep current index in an AtomicInteger and use getAndIncrement to obtain next index.

  • Since this task is CPU-bound, start N threads where N=number of available processors.

  • Count the number of finished tasks with CountDownLatch.

Add some initialization and min distance calculation here:

public class MinDistanceCalculation implements Runnable {  
    AtomicInteger idx=new AtomicInteger();
    int inpSize=100;
    double[] abc = new double[inpSize];
    double[] def = ...
    double[] xyz = new double[inpSize];
    CountDownLatch counter=new CountDownLatch(inpSize);

     public void run() {
      for (;;) {
       int nextIndex=idx.getAndIncrement();
       if (nextIndex>=inpSize) return;
       xyz[nextIndex]=minDistance(abc[nextIndex], def);
       counter.countDown();
    }

    void start() {
     for (int k=0; k<Runtime.getRuntime.availableProcessors()) {
        new Thread(this).start();
     }
     counter.await();
    }

    public static void main(String[] a) {
       new MinDistanceCalculation().start();
    }
  }

Upvotes: 1

Related Questions