我正在尝试用大约 200x200 网格对二维泊松方程进行数值求解。我正在尝试实现对角线方法以实现并行性:
#include <math.h>
#include <omp.h>
#include <stdio.h>
#include <stdlib.h>
int xf = -1;
int xl = 1;
int yf = -1;
int yl = 1;
double delta = 0.01;
double q(int i, int j) {
return (4 - 2 * (pow(xf + j * delta, 2) + pow(yl - i * delta, 2))) *
pow(delta, 2);
}
double actual(double x, double y, int n) {
return (pow(x, 2) - 1) * (pow(y, 2) - 1);
}
double error(double **mk, double **new, int n) {
double sum1 = 0;
double sum2 = 0;
double mf;
for (int i = 0; i < n; ++i) {
for (int j = 0; j < n; ++j) {
mf = mk[i][j];
sum1 += pow(mf, 2);
sum2 += pow(mf - new[i][j], 2);
}
}
return pow((double)sum2 / sum1, 0.5);
}
int main(void) {
int n = (xl - xf) / delta + 1;
double **phiactual = (double **)malloc(n * sizeof(double *));
for (int i = 0; i < n; i++)
phiactual[i] = (double *)malloc(n * sizeof(double));
double **phi = (double **)malloc(n * sizeof(double *));
for (int i = 0; i < n; i++)
phi[i] = (double *)malloc(n * sizeof(double));
int i, j, d;
int iter;
#pragma omp parallel shared(phi, phiactual, delta)
{
iter = 0;
#pragma omp for collapse(2)
for (int q = 0; q < n; ++q) {
for (int w = 0; w < n; ++w) {
phiactual[q][w] = actual(xf + w * delta, yl - q * delta, delta);
}
}
while (error(phiactual, phi, n) > 0.01) {
for (int g = 0; g < 2 * n - 5; ++g) {
if (g < n - 3) {
i = 1;
j = g + 1;
d = g + 1;
} else {
i = g - n + 4;
j = n - 2;
d = 2 * n - 5 - g;
}
#pragma omp for
for (int k = 0; k < d; ++k) {
phi[i][j] = 0.25 * (((double)(phi[i + 1][j] + phi[i][j + 1] +
phi[i - 1][j] + phi[i][j - 1])) +
q(i, j));
++i;
--j;
}
}
iter++;
}
#pragma omp single
printf("%i\n", iter);
}
for (int i = 0; i < n; i++)
free(phi[i]);
free(phi);
for (int i = 0; i < n; i++)
free(phiactual[i]);
free(phiactual);
return 0;
}
串行代码需要1分12秒,而并行版本只是无限期地运行。我使用 Schedule 子句尝试调整缓存捕获,但它也没有帮助。
带有
schedule(dynamic,1024)
的较小版本(20x20 网格)可以工作,但这本质上只是使其平行。
我不确定这种低效率是从哪里出现的。也欢迎对串行代码的效率提出意见。
我真的很惊讶它并没有更糟。看看你要求 openMP 并行化什么:
for (int k = 0; k < d; ++k) {
phi[i][j] = 0.25 * (((double)(phi[i + 1][j] + phi[i][j + 1] +
phi[i - 1][j] + phi[i][j - 1])) +
q(i, j));
++i;
--j;
您要求在不同的线程上运行此 for 循环的迭代,但计算取决于上一次迭代的值! (当前迭代的
phi[i+1][j]
是下一个迭代 phi[i][j+1]
)。
因此,您构建的算法不可能以这种简单的方式并行化。这是连续的,至少按照你所说的方式。