假设我正在尝试用另一个 MatrixXf 的滚动平均值填充 MatrixXf,如下所示
MatrixXf ComputeRollingMean(const MatrixXf& features) {
MatrixXf df(features.rows() - 30, features.cols());
for (size_t i = 30; i < features.rows(); ++i) {
MatrixXf mat = features.block(i-30, 0, 30, 3);
RowVectorXf mean_vector = mat.colwise().mean();
RowVectorXf stdev_vector = ((mat.rowwise() - mean_vector).colwise().squaredNorm() / (mat.rows()-1)).cwiseSqrt();
MatrixXf zscores = (mat.rowwise() - mean_vector).array().rowwise() / stdev_vector.array();
RowVectorXf zscore = zscores(last, all);
df.block(index, 0, 1, 3) = zscore;
}
return df;
}
Eigen 似乎有某种并行过程
#include <Eigen/Core>
然后
int main() {
Eigen::initParallel();
...}
但我不确定这如何在我的 ComputeRollingMean 函数中并行化 for 循环。有人知道我如何并行化,也许可以选择我想启动的进程数?
你需要的不是并行化而是简单的优化。根据经验,在用尽对串行代码的所有合理改进之前,永远不要并行化。例外情况可能适用于 GPU 等大规模并行架构,但即使它们通常也能从快速串行计算内核中获益。
您的代码存在一些问题:
features.block
分配给新矩阵时,您创建了一个毫无意义的副本zscores
矩阵,但只读取了最后一行小问题是:
size_t
,但 Eigen 的正确索引类型是 Eigen::Index
,这是有符号类型 std::ptrdiff_t
block()
或 row()
的地方使用 middleRows()
。这样不仅难以阅读,而且 Eigen 有时还可以使用有关切片形状的编译时信息来优化代码改进后的版本可能如下所示:
Eigen::MatrixXf ComputeZscore(const Eigen::MatrixXf& features) {
constexpr int windowsize = 30;
Eigen::MatrixXf df(features.rows() - windowsize, features.cols());
const float stdfactor = static_cast<float>(1. / std::sqrt(windowsize - 1));
Eigen::RowVectorXf mean_vector, stdev_vector;
for(Eigen::Index i = windowsize; i < features.rows(); ++i) {
const auto& block = features.middleRows(i - windowsize, windowsize);
mean_vector = block.colwise().mean();
stdev_vector = (block.rowwise() - mean_vector)
.colwise().norm() * stdfactor;
df.row(i - windowsize) = (block.row(windowsize - 1) - mean_vector)
.array() / stdev_vector.array();
}
return df;
}
注意 Eigen 关于使用
auto
的警告。用在这里就可以了。
在我的测试中,这段代码的运行速度已经是原来的两倍了。
看起来你的矩阵只有三列。因此行向量也很小。如果在编译时知道大小,我们可以保存动态分配并允许编译器优化代码。我们可以通过宏做出这个决定,以便它可以停用。
#define OPTIMIZE_FIXED_COLS 1
#if OPTIMIZE_FIXED_COLS
constexpr Eigen::Index FeatureCols = 3;
#else
constexpr Eigen::Index FeatureCols = Eigen::Dynamic;
#endif
// Same as MatrixXf or MatrixX3f
using MatrixType = Eigen::Matrix<float, Eigen::Dynamic, FeatureCols>;
// Same as RowVectorXf or RowVectorX3f
using RowVectorType = Eigen::Matrix<float, 1, FeatureCols>;
MatrixType ComputeZscore(const MatrixType& features) {
constexpr int windowsize = 30;
MatrixType df(features.rows() - windowsize, features.cols());
const float stdfactor = static_cast<float>(1. / std::sqrt(windowsize - 1));
RowVectorType mean_vector, stdev_vector;
for(Eigen::Index i = windowsize; i < features.rows(); ++i) {
const auto& block = features.middleRows(i - windowsize, windowsize);
mean_vector = block.colwise().mean();
stdev_vector = (block.rowwise() - mean_vector)
.colwise().norm() * stdfactor;
df.row(i - windowsize) = (block.row(windowsize - 1) - mean_vector)
.array() / stdev_vector.array();
}
return df;
}
这又带来了 14% 的加速。
您的算法取决于滚动平均值和标准差。如果您的平均值在
x[i, i + windowsize)
范围内,则只需减去 x[i + 1, i + windowsize + 1)
并加上值 x[i] / windowsize
即可计算出 x[i + windowsize] / windowsize
范围的平均值。现在您只需将添加数量从 30 减少到 2。
为了计算方差,有完整的维基百科文章以及示例代码。另外,还有一个StackOverflow 答案。
调整这些,我们得到:
MatrixType ComputeZscore(const MatrixType& features) {
constexpr int windowsize = 30;
const Eigen::Index out_rows = features.rows() - windowsize;
if(out_rows < 1)
return MatrixType(0, features.cols());
MatrixType df(out_rows, features.cols());
const float mean_factor = 1.f / windowsize;
const float var_factor = 1.f / (windowsize - 1);
RowVectorType mean[2], var_sum;
{ // initialize state with first row
const auto& block = features.topRows(windowsize);
mean[0] = block.colwise().sum() * mean_factor;
// actually variance
var_sum = (block.rowwise() - mean[0]).colwise().squaredNorm() * var_factor;
const auto& sample_row = features.row(windowsize - 1);
df.row(0) = (sample_row - mean[0]).array() / var_sum.array().sqrt();
var_sum *= float(windowsize - 1); // variance to var_sum
}
for(Eigen::Index i = 1; i < out_rows; ++i) {
RowVectorType& old_mean = mean[(i - 1) & 1];
RowVectorType& new_mean = mean[i & 1];
const auto& new_x = features.row(windowsize + i - 1);
const auto& old_x = features.row(i - 1);
new_mean = old_mean + mean_factor * (new_x - old_x);
var_sum = var_sum.array()
+ (new_x - old_mean).array() * (new_x - new_mean).array()
- (old_x - old_mean).array() * (old_x - new_mean).array();
const auto& sample_row = new_x;
df.row(i) = (sample_row - new_mean).array() / (
var_sum * var_factor /*->variance*/).array().sqrt();
}
return df;
}
在我的测试中,它的运行速度比之前的版本快了 10 倍以上。
现在的并行化比原来要复杂一些,因为我们的主循环现在总是依赖于上一次迭代的结果。如果我们有足够多的列,我们可以简单地让每个线程处理一个列或一小段列。但是,您的示例只有 3 个。相反,我们可以让每个线程处理一个输出行块。
#include <omp.h>
// using omp_get_num_threads, omp_get_thread_num
void computeZscoreSegment(const MatrixType& features, MatrixType& df,
Eigen::Index first_row, Eigen::Index out_rows) {
if(! out_rows)
return;
constexpr int windowsize = 30;
const float mean_factor = 1.f / windowsize;
const float var_factor = 1.f / (windowsize - 1);
RowVectorType mean[2], var_sum;
{
const auto& block = features.middleRows(first_row, windowsize);
mean[0] = block.colwise().sum() * mean_factor;
// actually variance
var_sum = (block.rowwise() - mean[0]).colwise().squaredNorm()
* var_factor;
const auto& sample_row = features.row(first_row + windowsize - 1);
df.row(first_row) = (sample_row - mean[0]).array()
/ var_sum.array().sqrt();
var_sum *= float(windowsize - 1); // variance to var_sum
}
for(Eigen::Index i = 1; i < out_rows; ++i) {
RowVectorType& old_mean = mean[(i - 1) & 1];
RowVectorType& new_mean = mean[i & 1];
const auto& new_x = features.row(first_row + windowsize + i - 1);
const auto& old_x = features.row(first_row + i - 1);
new_mean = old_mean + mean_factor * (new_x - old_x);
var_sum = var_sum.array()
+ (new_x - old_mean).array() * (new_x - new_mean).array()
- (old_x - old_mean).array() * (old_x - new_mean).array();
const auto& sample_row = new_x;
df.row(first_row + i) = (sample_row - new_mean).array() / (
var_sum * var_factor /*->variance*/).array().sqrt();
}
}
MatrixType ComputeZscore(const MatrixType& features) {
constexpr int windowsize = 30;
const Eigen::Index out_rows = features.rows() - windowsize;
if(out_rows < 1)
return MatrixType(0, features.cols());
MatrixType df(out_rows, features.cols());
# pragma omp parallel
{
// distribute rows evenly across threads
const int threads = omp_get_num_threads();
const int thread_num = omp_get_thread_num();
Eigen::Index thread_rows = out_rows / threads;
const int distr_modulo = out_rows % threads;
const Eigen::Index own_start = thread_rows * thread_num
+ std::min(thread_num, distr_modulo);
thread_rows += thread_num < distr_modulo;
computeZscoreSegment(features, df, own_start, thread_rows);
}
return df;
}
在我的系统上,6 核/12 线程的速度又提高了 4 倍,但这在很大程度上取决于您的硬件。我预计它会相对较快地变得受内存限制。
全部使用编译标志:
-O3 -march=native -fopenmp -DNDEBUG -fno-math-errno -fno-trapping-math
考虑转置输入/输出矩阵。由于 Eigen 的存储默认是列优先,因此
matrix.col(i)
比 matrix.row(i)
更高效。现在只有 3 列,这似乎根本不重要。但是,如果增加列数,就会变得很显着。