我用 C++ 编写了一个简单的矩阵乘法程序,并且它有效。我只是 C++ 的初学者,但我已经设法让它工作了。
令我困惑的是它比 NumPy 慢得多。我已经对其进行了基准测试。
因此我尝试使用 OpenMP 来加速,但我发现性能完全没有变化:
#include <algorithm>
#include <chrono>
#include <iostream>
#include <omp.h>
#include <string>
#include <vector>
using std::vector;
using std::chrono::high_resolution_clock;
using std::chrono::duration;
using std::chrono::duration_cast;
using std::chrono::microseconds;
using std::cout;
using line = vector<double>;
using matrix = vector<line>;
void fill(line &l) {
std::generate(l.begin(), l.end(), []() { return ((double)rand() / (RAND_MAX)); });
}
matrix random_matrx(int64_t height, int64_t width) {
matrix mat(height, line(width));
std::for_each(mat.begin(), mat.end(), fill);
return mat;
}
matrix dot_product(const matrix &mat0, const matrix &mat1) {
size_t h0, w0, h1, w1;
h0 = mat0.size();
w0 = mat0[0].size();
h1 = mat1.size();
w1 = mat1[0].size();
if (w0 != h1) {
throw std::invalid_argument("matrices cannot be cross multiplied");
}
matrix out(h0, line(w1));
for (int y = 0; y < h0; y++) {
for (int x = 0; x < w1; x++) {
double s = 0;
for (int z = 0; z < w0; z++) {
s += mat0[y][z] * mat1[z][x];
}
out[y][x] = s;
}
}
return out;
}
matrix dot_product_omp(const matrix& mat0, const matrix& mat1) {
size_t h0, w0, h1, w1;
h0 = mat0.size();
w0 = mat0[0].size();
h1 = mat1.size();
w1 = mat1[0].size();
if (w0 != h1) {
throw std::invalid_argument("matrices cannot be cross multiplied");
}
matrix out(h0, line(w1));
omp_set_num_threads(4);
#pragma omp parallel for private(y, x, z) schedule(dynamic)
for (int y = 0; y < h0; y++) {
for (int x = 0; x < w1; x++) {
double s = 0;
for (int z = 0; z < w0; z++) {
s += mat0[y][z] * mat1[z][x];
}
out[y][x] = s;
}
}
return out;
}
int main()
{
matrix a, b;
a = random_matrx(16, 9);
b = random_matrx(9, 24);
auto start = high_resolution_clock::now();
for (int64_t i = 0; i < 65536; i++) {
dot_product(a, b);
}
auto end = high_resolution_clock::now();
duration<double, std::nano> time = end - start;
double once = time.count() / 65536000;
cout << "mat(16, 9) * mat(9, 24): " + std::to_string(once) + " microseconds\n";
a = random_matrx(128, 256);
b = random_matrx(256, 512);
start = high_resolution_clock::now();
for (int64_t i = 0; i < 512; i++) {
dot_product(a, b);
}
end = high_resolution_clock::now();
time = end - start;
once = time.count() / 512000;
cout << "mat(128, 256) * mat(256, 512): " + std::to_string(once) + " microseconds\n";
start = high_resolution_clock::now();
for (int64_t i = 0; i < 512; i++) {
dot_product_omp(a, b);
}
end = high_resolution_clock::now();
time = end - start;
once = time.count() / 512000;
cout << "mat(128, 256) * mat(256, 512) omp: " + std::to_string(once) + " microseconds\n";
}
PS D:\MyScript> C:\Users\Xeni\source\repos\matmul\x64\Release\matmul.exe
mat(16, 9) * mat(9, 24): 5.200116 microseconds
mat(128, 256) * mat(256, 512): 30128.739453 microseconds
mat(128, 256) * mat(256, 512) omp: 30116.103125 microseconds
我使用 Visual Studio 2022、C++20 标准、编译器标志编译它:
/permissive- /ifcOutput "x64\Release\" /GS /GL /W3 /Gy /Zc:wchar_t /Zi /Gm- /O2 /Ob2 /sdl /Fd"x64\Release\vc143.pdb" /Zc:inline /fp:precise /D "NDEBUG" /D "_CONSOLE" /D "_UNICODE" /D "UNICODE" /errorReport:prompt /WX- /Zc:forScope /std:c17 /Gd /Oi /MD /std:c++20 /FC /Fa"x64\Release\" /EHsc /nologo /Fo"x64\Release\" /Ot /Fp"x64\Release\matmul.pch" /diagnostics:column
其他标志:
/arch:AVX2 /fp:fast
为什么没有改善?我怎样才能真正改善它?
我已将 OMP 版本更改为:
matrix dot_product_omp(const matrix& mat0, const matrix& mat1) {
size_t h0, w0, h1, w1;
h0 = mat0.size();
w0 = mat0[0].size();
h1 = mat1.size();
w1 = mat1[0].size();
if (w0 != h1) {
throw std::invalid_argument("matrices cannot be cross multiplied");
}
matrix out(h0, line(w1));
omp_set_num_threads(4);
#pragma omp parallel for schedule(dynamic)
for (int y = 0; y < h0; y++) {
for (int x = 0; x < w1; x++) {
double s = 0;
for (int z = 0; z < w0; z++) {
s += mat0[y][z] * mat1[z][x];
}
out[y][x] = s;
}
}
return out;
}
我使用
/openmp
标志进行编译,我已经进行了多次基准测试,它只使代码运行时间约为原始时间的四分之一:
PS D:\MyScript> C:\Users\Xeni\source\repos\matmul\x64\Release\matmul.exe
mat(16, 9) * mat(9, 24): 5.126476 microseconds
mat(128, 256) * mat(256, 512): 30999.137109 microseconds
mat(128, 256) * mat(256, 512) omp: 8574.475195 microseconds
NumPy 更快:
In [374]: a = np.random.random((128, 256))
In [375]: b = np.random.random((256, 512))
In [376]: %timeit a @ b
382 µs ± 19.6 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
我的代码慢了一个数量级。那么如何才能缩小性能差距呢?
代码中存在很多问题,但关键是内存访问模式效率低下并且它阻止(几乎)任何向量化。
对
mat1[z][x]
的访问效率很低,因为当 z
增加时,需要获取新的向量,然后从内存中获取第 x
项。这会导致两次类似随机的内存读取。这种内存访问比顺序内存访问慢得多。更不用说大多数编译器不会对具有此类内存访问的循环进行矢量化,因为这几乎是不可能的(理论上这对于 SIMD 集合来说是可能的,但实际上效率很低)。最重要的是,缓存行的使用很差:仅使用了与mat1
相关的缓存行的8/64字节,其余的都被浪费了,因为缓存行将很快从缓存中逐出(导致缓存垃圾)。这样的问题会导致应用程序在大多数平台上无法很好地扩展,因为它会受到“内存限制”(使用更多内核并不会使 RAM 运行得更快)。 您需要连续读取数据才能获得更好的性能。这是一个更快的实现:
#pragma omp parallel for schedule(dynamic)
for (int y = 0; y < h0; y++) {
for (int x = 0; x < w1; x++) {
out[y][x] += 0.0;
}
for (int z = 0; z < w0; z++) {
for (int x = 0; x < w1; x++) {
out[y][x] += mat0[y][z] * mat1[z][x];
}
}
}
Before:
mat(16, 9) * mat(9, 24): 2.931490 microseconds
mat(128, 256) * mat(256, 512): 14704.138781 microseconds
mat(128, 256) * mat(256, 512) omp: 4013.665295 microseconds
After:
mat(16, 9) * mat(9, 24): 0.931926 microseconds
mat(128, 256) * mat(256, 512): 3296.070098 microseconds
mat(128, 256) * mat(256, 512) omp: 1230.341350 microseconds
顺序代码比以前快 4.3 倍,并行代码现在快 3.3 倍。现在应该对代码进行矢量化。
由于其他因素,代码仍然相当低效,例如:
vector<vector<double>>
展平数组(或特征值);
schedule(dynamic)
schedule(static)
;
dgemm
BLAS 原语,这在大多数机器上接近最佳(至少对于 OpenBLAS、BLIS 和 Intel MKL)。如果没有低级 SIMD 内在函数或汇编代码,很难获得类似的性能(大多数编译器会生成次优代码,由于寄存器分配不当而无法实现最佳性能)。
很多HPC书籍和教程解释了此类问题以及如何解决它们。有些具体解释了如何获得相对快速的矩阵乘法。我强烈鼓励您阅读它们。 请注意,
private(y, x, z)
没有用,因为变量是在循环中声明的。事实上,这甚至是不正确的(像 GCC 这样的编译器会打印错误)。另请注意,使用
omp_set_num_threads(4);
通常被认为是一种不好的做法。您应该修改环境变量OMP_NUM_THREADS
。另外请注意,MSVC 支持的默认 OpenMP 版本非常旧且完全过时。应改用 Clang OpenMP 版本。事实上,据我所知 MSVC 不是一个好的优化编译器,因此应该使用 Clang/GCC(或任何 HPC 编译器,如 ICC、PGI 等)。