C++ 中特征矩阵的并行填充

问题描述 投票:0回答:1

假设我正在尝试用另一个 MatrixXf 的滚动平均值填充 MatrixXf,如下所示

MatrixXf ComputeRollingMean(const MatrixXf& features) {
    MatrixXf df(features.rows() - 30, features.cols());

    for (size_t i = 30; i < features.rows(); ++i) {
            MatrixXf mat = features.block(i-30, 0, 30, 3);
            RowVectorXf mean_vector = mat.colwise().mean();
            RowVectorXf stdev_vector = ((mat.rowwise() - mean_vector).colwise().squaredNorm() / (mat.rows()-1)).cwiseSqrt();
            MatrixXf zscores = (mat.rowwise() - mean_vector).array().rowwise() / stdev_vector.array();
            RowVectorXf zscore = zscores(last, all);
            df.block(index, 0, 1, 3) = zscore;
    }
    return df;
}

Eigen 似乎有某种并行过程

#include <Eigen/Core>

然后

int main() {
    Eigen::initParallel();
    ...}

但我不确定这如何在我的 ComputeRollingMean 函数中并行化 for 循环。有人知道我如何并行化,也许可以选择我想启动的进程数?

c++ parallel-processing eigen
1个回答
0
投票

你需要的不是并行化而是简单的优化。根据经验,在用尽对串行代码的所有合理改进之前,永远不要并行化。例外情况可能适用于 GPU 等大规模并行架构,但即使它们通常也能从快速串行计算内核中获益。

基本代码清理

您的代码存在一些问题:

  • 当您将
    features.block
    分配给新矩阵时,您创建了一个毫无意义的副本
  • 您可以在循环内分配和释放多个动态向量和矩阵,而不是将分配移至循环外
  • 您创建了一个完整的
    zscores
    矩阵,但只读取了最后一行

小问题是:

  • 您使用
    size_t
    ,但 Eigen 的正确索引类型是
    Eigen::Index
    ,这是有符号类型
    std::ptrdiff_t
  • 您可以在可以简单使用
    block()
    row()
    的地方使用
    middleRows()
    。这样不仅难以阅读,而且 Eigen 有时还可以使用有关切片形状的编译时信息来优化代码

改进后的版本可能如下所示:

Eigen::MatrixXf ComputeZscore(const Eigen::MatrixXf& features) {
    constexpr int windowsize = 30;
    Eigen::MatrixXf df(features.rows() - windowsize, features.cols());
    const float stdfactor = static_cast<float>(1. / std::sqrt(windowsize - 1));
    Eigen::RowVectorXf mean_vector, stdev_vector;
    for(Eigen::Index i = windowsize; i < features.rows(); ++i) {
        const auto& block = features.middleRows(i - windowsize, windowsize);
        mean_vector = block.colwise().mean();
        stdev_vector = (block.rowwise() - mean_vector)
                .colwise().norm() * stdfactor;
        df.row(i - windowsize) = (block.row(windowsize - 1) - mean_vector)
                .array() / stdev_vector.array();
    }
    return df;
}

注意 Eigen 关于使用

auto
的警告。用在这里就可以了。

在我的测试中,这段代码的运行速度已经是原来的两倍了。

固定列数

看起来你的矩阵只有三列。因此行向量也很小。如果在编译时知道大小,我们可以保存动态分配并允许编译器优化代码。我们可以通过宏做出这个决定,以便它可以停用。

#define OPTIMIZE_FIXED_COLS 1

#if OPTIMIZE_FIXED_COLS
constexpr Eigen::Index FeatureCols = 3;
#else
constexpr Eigen::Index FeatureCols = Eigen::Dynamic;
#endif

// Same as MatrixXf or MatrixX3f
using MatrixType = Eigen::Matrix<float, Eigen::Dynamic, FeatureCols>;
// Same as RowVectorXf or RowVectorX3f
using RowVectorType = Eigen::Matrix<float, 1, FeatureCols>;

MatrixType ComputeZscore(const MatrixType& features) {
    constexpr int windowsize = 30;
    MatrixType df(features.rows() - windowsize, features.cols());
    const float stdfactor = static_cast<float>(1. / std::sqrt(windowsize - 1));
    RowVectorType mean_vector, stdev_vector;
    for(Eigen::Index i = windowsize; i < features.rows(); ++i) {
        const auto& block = features.middleRows(i - windowsize, windowsize);
        mean_vector = block.colwise().mean();
        stdev_vector = (block.rowwise() - mean_vector)
                .colwise().norm() * stdfactor;
        df.row(i - windowsize) = (block.row(windowsize - 1) - mean_vector)
                .array() / stdev_vector.array();
    }
    return df;
}

这又带来了 14% 的加速。

算法改进

您的算法取决于滚动平均值和标准差。如果您的平均值在

x[i, i + windowsize)
范围内,则只需减去
x[i + 1, i + windowsize + 1)
并加上值
x[i] / windowsize
即可计算出
x[i + windowsize] / windowsize
范围的平均值。现在您只需将添加数量从 30 减少到 2。

为了计算方差,有完整的维基百科文章以及示例代码。另外,还有一个StackOverflow 答案

调整这些,我们得到:

MatrixType ComputeZscore(const MatrixType& features) {
    constexpr int windowsize = 30;
    const Eigen::Index out_rows = features.rows() - windowsize;
    if(out_rows < 1)
        return MatrixType(0, features.cols());
    MatrixType df(out_rows, features.cols());
    const float mean_factor = 1.f / windowsize;
    const float var_factor = 1.f / (windowsize - 1);
    RowVectorType mean[2], var_sum;
    { // initialize state with first row
        const auto& block = features.topRows(windowsize);
        mean[0] = block.colwise().sum() * mean_factor;
        // actually variance
        var_sum = (block.rowwise() - mean[0]).colwise().squaredNorm() * var_factor;
        const auto& sample_row = features.row(windowsize - 1);
        df.row(0) = (sample_row - mean[0]).array() / var_sum.array().sqrt();
        var_sum *= float(windowsize - 1); // variance to var_sum
    }
    for(Eigen::Index i = 1; i < out_rows; ++i) {
        RowVectorType& old_mean = mean[(i - 1) & 1];
        RowVectorType& new_mean = mean[i & 1];
        const auto& new_x = features.row(windowsize + i - 1);
        const auto& old_x = features.row(i - 1);
        new_mean = old_mean + mean_factor * (new_x - old_x);
        var_sum = var_sum.array()
                + (new_x - old_mean).array() * (new_x - new_mean).array()
                - (old_x - old_mean).array() * (old_x - new_mean).array();
        const auto& sample_row = new_x;
        df.row(i) = (sample_row - new_mean).array() / (
                var_sum * var_factor /*->variance*/).array().sqrt();
    }
    return df;
}

在我的测试中,它的运行速度比之前的版本快了 10 倍以上。

并行化

现在的并行化比原来要复杂一些,因为我们的主循环现在总是依赖于上一次迭代的结果。如果我们有足够多的列,我们可以简单地让每个线程处理一个列或一小段列。但是,您的示例只有 3 个。相反,我们可以让每个线程处理一个输出行块。

#include <omp.h>
// using omp_get_num_threads, omp_get_thread_num

void computeZscoreSegment(const MatrixType& features, MatrixType& df,
        Eigen::Index first_row, Eigen::Index out_rows) {
    if(! out_rows)
        return;
    constexpr int windowsize = 30;
    const float mean_factor = 1.f / windowsize;
    const float var_factor = 1.f / (windowsize - 1);
    RowVectorType mean[2], var_sum;
    {
        const auto& block = features.middleRows(first_row, windowsize);
        mean[0] = block.colwise().sum() * mean_factor;
        // actually variance
        var_sum = (block.rowwise() - mean[0]).colwise().squaredNorm()
                * var_factor;
        const auto& sample_row = features.row(first_row + windowsize - 1);
        df.row(first_row) = (sample_row - mean[0]).array()
                / var_sum.array().sqrt();
        var_sum *= float(windowsize - 1); // variance to var_sum
    }
    for(Eigen::Index i = 1; i < out_rows; ++i) {
        RowVectorType& old_mean = mean[(i - 1) & 1];
        RowVectorType& new_mean = mean[i & 1];
        const auto& new_x = features.row(first_row + windowsize + i - 1);
        const auto& old_x = features.row(first_row + i - 1);
        new_mean = old_mean + mean_factor * (new_x - old_x);
        var_sum = var_sum.array()
                + (new_x - old_mean).array() * (new_x - new_mean).array()
                - (old_x - old_mean).array() * (old_x - new_mean).array();
        const auto& sample_row = new_x;
        df.row(first_row + i) = (sample_row - new_mean).array() / (
                var_sum * var_factor /*->variance*/).array().sqrt();
    }
}
MatrixType ComputeZscore(const MatrixType& features) {
    constexpr int windowsize = 30;
    const Eigen::Index out_rows = features.rows() - windowsize;
    if(out_rows < 1)
        return MatrixType(0, features.cols());
    MatrixType df(out_rows, features.cols());
#   pragma omp parallel
    {
        // distribute rows evenly across threads
        const int threads = omp_get_num_threads();
        const int thread_num = omp_get_thread_num();
        Eigen::Index thread_rows = out_rows / threads;
        const int distr_modulo = out_rows % threads;
        const Eigen::Index own_start = thread_rows * thread_num
                + std::min(thread_num, distr_modulo);
        thread_rows += thread_num < distr_modulo;
        computeZscoreSegment(features, df, own_start, thread_rows);
    }
    return df;
}

在我的系统上,6 核/12 线程的速度又提高了 4 倍,但这在很大程度上取决于您的硬件。我预计它会相对较快地变得受内存限制。

全部使用编译标志:

-O3 -march=native -fopenmp -DNDEBUG -fno-math-errno -fno-trapping-math

转置输入/输出

考虑转置输入/输出矩阵。由于 Eigen 的存储默认是列优先,因此

matrix.col(i)
matrix.row(i)
更高效。现在只有 3 列,这似乎根本不重要。但是,如果增加列数,就会变得很显着。

© www.soinside.com 2019 - 2024. All rights reserved.