Reputation: 2082
So I am trying to understand about how ForkJoinPool works. I am trying to achieve better performance using this for a large array of about 2 million elements and then adding their reciprocal. I understand that ForkJoinPool.commpnPool().invoke(task); calls compute() which forks the task in two tasks if it is not smaller and then computes and then joins them. So far, we are using two cores.
But if I want to xecute this on multiple cores, how do I do that and achieve 4 times better performance than the usual single thread run? Below is my code for default ForkJoinPool():
@Override
protected void compute() {
// TODO
if (endIndexExclusive - startIndexInclusive <= seq_count) {
for (int i = startIndexInclusive; i < endIndexExclusive; i++)
value += 1 / input[i];
} else {
ReciprocalArraySumTask left = new ReciprocalArraySumTask(startIndexInclusive,
(endIndexExclusive + startIndexInclusive) / 2, input);
ReciprocalArraySumTask right = new ReciprocalArraySumTask((endIndexExclusive + startIndexInclusive) / 2,
endIndexExclusive, input);
left.fork();
right.compute();
left.join();
value = left.value + right.value;
}
}
}
protected static double parArraySum(final double[] input) {
assert input.length % 2 == 0;
double sum = 0;
// Compute sum of reciprocals of array elements
ReciprocalArraySumTask task = new ReciprocalArraySumTask(0, input.length, input);
ForkJoinPool.commonPool().invoke(task);
return task.getValue();
}
//Here I am trying to achieve with 4 cores
protected static double parManyTaskArraySum(final double[] input,
final int numTasks) {
double sum = 0;
System.out.println("Total tasks = " + numTasks);
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", String.valueOf(numTasks));
// Compute sum of reciprocals of array elements
int chunkSize = ReciprocalArraySum.getChunkSize(numTasks, input.length);
System.out.println("Chunk size = " + chunkSize);
ReciprocalArraySumTask task = new ReciprocalArraySumTask(0, input.length, input);
ForkJoinPool pool = new ForkJoinPool();
// pool.
ForkJoinPool.commonPool().invoke(task);
return task.getValue();
}
Upvotes: 2
Views: 2220
Reputation: 181
This is my approach:
Threshold is the limit when the compute starts to calculate and stops to stack recursive calls, this works better if each processor is used twice or more (there is a limit of course), that's because I use numTask * 2
.
protected static double parManyTaskArraySum(final double[] input,
final int numTasks) {
int start;
int end;
int size = input.length;
int threshold = size / (numTasks * 2);
List<ReciprocalArraySumTask> actions = new ArrayList<>();
for (int i = 0; i < numTasks; i++) {
start = getChunkStartInclusive(i, numTasks, size);
end = getChunkEndExclusive(i, numTasks, size);
actions.add(new ReciprocalArraySumTask(start, end, input, threshold, I));
}
ForkJoinTask.invokeAll(actions);
return actions.stream().map(ReciprocalArraySumTask::getValue).reduce(new Double(0), Double::sum);
}
Upvotes: 1
Reputation: 144
You want to use 4 cores but you are giving a job which will need only two cores. In the following example, getChunkStartInclusive and getChunkEndExclusive methods give the range for beginning and ending indexes of each chunk. I believe the following code can solve your problem and give you some implementation idea.
protected static double parManyTaskArraySum(final double[] input,
final int numTasks) {
double sum = 0;
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", String.valueOf(numTasks));
List<ReciprocalArraySumTask> ts = new ArrayList<ReciprocalArraySumTask>(numTasks);
int i;
for (i = 0; i < numTasks - 1 ; i++) {
ts.add(new ReciprocalArraySumTask(getChunkStartInclusive(i,numTasks,input.length),getChunkEndExclusive(i,numTasks,input.length),input));
ts.get(i).fork();
}
ts.add( new ReciprocalArraySumTask(getChunkStartInclusive(i, numTasks, input.length), getChunkEndExclusive(i, numTasks, input.length), input));
ts.get(i).compute();
for (int j = 0; j < numTasks - 1; j++) {
ts.get(j).join();
}
for (int j = 0; j < numTasks; j++) {
sum += ts.get(j).getValue();
}
return sum;
}
Upvotes: 1