因此,我试图了解ForkJoinPool的工作方式。我正在尝试使用大约200万个元素的大数组来获得更好的性能,然后添加它们的倒数。我了解ForkJoinPool.commpnPool()。invoke(task);调用compute(),如果任务不小,它将把任务分叉到两个任务中,然后进行计算,然后将它们加入。到目前为止,我们正在使用两个核心。
但是,如果我想在多个内核上执行此操作,该如何实现,并获得比通常的单线程运行好4倍的性能?下面是我的默认ForkJoinPool()代码:
@Override
protected void compute() {
// TODO
if (endIndexExclusive - startIndexInclusive <= seq_count) {
for (int i = startIndexInclusive; i < endIndexExclusive; i++)
value += 1 / input[i];
} else {
ReciprocalArraySumTask left = new ReciprocalArraySumTask(startIndexInclusive,
(endIndexExclusive + startIndexInclusive) / 2, input);
ReciprocalArraySumTask right = new ReciprocalArraySumTask((endIndexExclusive + startIndexInclusive) / 2,
endIndexExclusive, input);
left.fork();
right.compute();
left.join();
value = left.value + right.value;
}
}
}
protected static double parArraySum(final double[] input) {
assert input.length % 2 == 0;
double sum = 0;
// Compute sum of reciprocals of array elements
ReciprocalArraySumTask task = new ReciprocalArraySumTask(0, input.length, input);
ForkJoinPool.commonPool().invoke(task);
return task.getValue();
}
//Here I am trying to achieve with 4 cores
protected static double parManyTaskArraySum(final double[] input,
final int numTasks) {
double sum = 0;
System.out.println("Total tasks = " + numTasks);
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", String.valueOf(numTasks));
// Compute sum of reciprocals of array elements
int chunkSize = ReciprocalArraySum.getChunkSize(numTasks, input.length);
System.out.println("Chunk size = " + chunkSize);
ReciprocalArraySumTask task = new ReciprocalArraySumTask(0, input.length, input);
ForkJoinPool pool = new ForkJoinPool();
// pool.
ForkJoinPool.commonPool().invoke(task);
return task.getValue();
}
您想使用4个核心,但是您要提供的工作将仅需要两个核心。在下面的示例中,getChunkStartInclusive和getChunkEndExclusive方法给出了每个块的开始和结束索引的范围。我相信以下代码可以解决您的问题,并为您提供一些实现想法。
protected static double parManyTaskArraySum(final double[] input,
final int numTasks) {
double sum = 0;
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", String.valueOf(numTasks));
List<ReciprocalArraySumTask> ts = new ArrayList<ReciprocalArraySumTask>(numTasks);
int i;
for (i = 0; i < numTasks - 1 ; i++) {
ts.add(new ReciprocalArraySumTask(getChunkStartInclusive(i,numTasks,input.length),getChunkEndExclusive(i,numTasks,input.length),input));
ts.get(i).fork();
}
ts.add( new ReciprocalArraySumTask(getChunkStartInclusive(i, numTasks, input.length), getChunkEndExclusive(i, numTasks, input.length), input));
ts.get(i).compute();
for (int j = 0; j < numTasks - 1; j++) {
ts.get(j).join();
}
for (int j = 0; j < numTasks; j++) {
sum += ts.get(j).getValue();
}
return sum;
}