如何在 C/C++ 中启用具有多线程 FFTW 的 OpenMP?

问题描述 投票:0回答:1

当我在他们的文档中读到您应该创建所有 fftw 计划一次并执行多次时,我正在为 C/C++ 中 FFTW 的缓慢实现而苦苦挣扎,而我能够正确实现这一点。现在,我尝试将内置 openmp 功能与并行 FFTW 结合使用。我遵循了他们关于 fftw 线程安全的文档:

The only thread-safe routine in FFTW is fftw_execute.
All other routines (e.g. the planner) should only be called from one thread at a time.
So, for example, you can wrap a semaphore lock around any calls to the planner.

这就是我使用

std::lock_guard<std::mutex> lock(...
所做的 但是,现在的问题是我不知道如何或在哪里调用以下内容:

Third, before creating a plan that you want to parallelize, you should call:

void fftw_plan_with_nthreads(int nthreads);

我现在拥有的任何示例代码似乎都不起作用或无法“并行化”!

代码示例:

#include"omp.h"  
#include <mutex>
#include <thread>
#include "fftw3.h"

static const int nx = 128;
static const int ny = 128;
static const int ncomp = 2;
static const int nyk = ny/2 + 1;

class MyFftwClass {
public:
    MyFftwClass(void);
    ~MyFftwClass(void);
    void execute(double rArr[], double cArr[][ncomp]);

private:
    static fftw_plan s_plan; // <-- shared by all instances
    double *m_buffer_in;
    fftw_complex *m_buffer_out;
};


class MyFftwClass1 {
public:
    MyFftwClass1(void);
    ~MyFftwClass1(void);
    void execute1(double cArr[][ncomp],double rArr[]);

private:
    static fftw_plan s_plan1; // <-- shared by all instances
    fftw_complex *m_buffer_in1;
    double *m_buffer_out1;
};

int main(){ 
    
fftw_init_threads(); //before calling any FFTW routines, you should call the function
//This function, which should only be called once (probably in your main() function), performs any one-time initialization required to use threads on your system.
int nThreads =1;//4;
omp_set_num_threads(nThreads);

double *Function;
Function= (double*) fftw_malloc(nx*ny*sizeof(double)); 
    for (int i = 0; i < nx; i++){
        for (int j = 0; j < ny; j++){
          Function[j + ny*i]  = //some initialization; 
            
        }
    }

fftw_complex *Functionk;
Functionk= (fftw_complex*) fftw_malloc(nx*nyk*sizeof(fftw_complex)); 
memset(Functionk, 42, nx*nyk* sizeof(fftw_complex));

MyFftwClass r2c1; //declare r2c1 of type MyFftwClass
r2c1.execute(Function,Functionk);

double *Out;
Out = (double*) fftw_malloc(nx*ny*sizeof(double)); 
memset(Out, 42, nx*ny* sizeof(double));

MyFftwClass1 c2r1; //declare r2c1 of type MyFftwClass
c2r1.execute1(Functionk,Out);
//fftw_free stuff
}
std::mutex g_my_fftw_mutex; // <-- must lock before using any fftw_*() function, except for fftw_execute*()
MyFftwClass1::MyFftwClass1(void)
{
    // serialize the initialization!
    std::lock_guard<std::mutex> lock(g_my_fftw_mutex);

    // allocate separate buffers for each instance
    m_buffer_in1 = fftw_alloc_complex(nx * nyk);
    m_buffer_out1 = fftw_alloc_real(nx * ny); 
    if (!(m_buffer_in1 && m_buffer_out1))
    {
        throw new std::runtime_error("Failed to allocate memory!");
    }

    // create plan *once* for all instances/threads
    if (!s_plan1)
    {
        s_plan1 = fftw_plan_dft_c2r_2d(nx, ny, m_buffer_in1, m_buffer_out1, FFTW_PATIENT);
        if (!s_plan1)
        {
            throw new std::runtime_error("Failed to create plan!");
        }
    }

}
MyFftwClass1::~MyFftwClass1(void)
{
    std::lock_guard<std::mutex> lock(g_my_fftw_mutex);
    fftw_destroy_plan(s_plan1);
    fftw_free(m_buffer_in1);
    fftw_free(m_buffer_out1);
}

void MyFftwClass1::execute1(double cArr[][ncomp],double rArr[]) //void MyFftwClass::execute(double *const in, fftw_complex *const out)
{
    // No serialization is needed here
    memcpy(m_buffer_in1,cArr, sizeof(fftw_complex) * nx*(nyk));
    fftw_execute_dft_c2r(s_plan1, m_buffer_in1, m_buffer_out1); //instead of fftw_excute(plan)
    memcpy(rArr, m_buffer_out1,sizeof(double) * nx*ny);
    //(rArr, 1.0 / (nx*ny), rArr); //renormalize 
}

fftw_plan MyFftwClass1::s_plan1 = NULL;

std::mutex g_my_fftw_mutex; // <-- must lock before using any fftw_*() function, except for fftw_execute*()
MyFftwClass::MyFftwClass(void)
//int r2cfft_initialize(r2cfft_t *const ctx, const std::size_t nx, const std::size_t ny)
{
    // serialize the initialization!
    std::lock_guard<std::mutex> lock(g_my_fftw_mutex);

    //allocating buffers first before plan creating gets rid off seg fault error

    // allocate separate buffers for each instance
    m_buffer_in = fftw_alloc_real(nx * ny); 
    m_buffer_out = fftw_alloc_complex(nx * nyk);
    if (!(m_buffer_in && m_buffer_out))
    {
        throw new std::runtime_error("Failed to allocate memory!");
    }

    // create plan *once* for all instances/threads
   if (!s_plan)
    {
        s_plan = fftw_plan_dft_r2c_2d(nx, ny, m_buffer_in, m_buffer_out, FFTW_PATIENT);
        if (!s_plan)
        {
            throw new std::runtime_error("Failed to create plan!");
        }
    }

    
}

MyFftwClass::~MyFftwClass(void)
{
    std::lock_guard<std::mutex> lock(g_my_fftw_mutex);
    fftw_free(m_buffer_in);
    fftw_free(m_buffer_out);
    fftw_destroy_plan(s_plan);
}

void MyFftwClass::execute(double rArr[], double cArr[][ncomp]) //void MyFftwClass::execute(double *const in, fftw_complex *const out)
{
    // No serialization is needed here
    memcpy(m_buffer_in, rArr,  sizeof(double) * nx*ny);
    fftw_execute_dft_r2c(s_plan, m_buffer_in, m_buffer_out); //instead of fftw_excute(plan)
    memcpy(cArr, m_buffer_out, sizeof(fftw_complex) * nx*(nyk));
}

fftw_plan MyFftwClass::s_plan = NULL;

这看起来是线程安全的和/或并行的吗?谢谢!

c++ multithreading openmp fftw
1个回答
0
投票

有两件事:

  1. 您与“FFTW_PATIENT”一起制定计划。这是非常昂贵的,对于线程来说更是如此。您需要经常重复使用计划,进行很多很多转换;或者或者你需要保存计划的智慧,这样成本只产生一次

  2. 你的FFT相当小。可能无法并行化

最小修复

这是一个经过最小修改的版本。还有很多我认为已经过时的做法,或者可以做得更好的做法。但让我们首先关注这一点:

int main(){

    fftw_init_threads();
    fftw_import_system_wisdom();
    fftw_import_wisdom_from_filename("/var/tmp/my_application_fftw.wisdom");
    int nThreads =1;//4;
    omp_set_num_threads(nThreads);
    fftw_plan_with_nthreads(nThreads);


    double *Function;
    Function= (double*) fftw_malloc(nx*ny*sizeof(double));
    for (int i = 0; i < nx; i++){
        for (int j = 0; j < ny; j++){
            Function[j + ny*i]  = 1.;//some initialization;
        }
    }

    fftw_complex *Functionk;
    Functionk= (fftw_complex*) fftw_malloc(nx*nyk*sizeof(fftw_complex));
    memset(Functionk, 42, nx*nyk* sizeof(fftw_complex));

    double *Out;
    Out = (double*) fftw_malloc(nx*ny*sizeof(double));
    memset(Out, 42, nx*ny* sizeof(double));
    MyFftwClass r2c1; //declare r2c1 of type MyFftwClass
    MyFftwClass1 c2r1; //declare r2c1 of type MyFftwClass
    for(int i = 0; i < 100000; ++i) {
        r2c1.execute(Function,Functionk);
        c2r1.execute1(Functionk,Out);
    }
    fftw_export_wisdom_to_filename("/var/tmp/my_application_fftw.wisdom");
    //fftw_free stuff
}

使用

g++ -O3 -lfftw -lfftw_omp -fopenmp
进行编译(如果
-lfftw_threads
不可用,则使用
fftw_omp

注意我如何重复使用相同的两个计划 100,000 次。您实际上需要利用这些计划。我们在应用程序调用之间缓存规划结果。最终的应用程序需要更多地考虑要将该文件放在哪里。例如,在集群上,它应该位于您的主目录中,而不是某些计算节点的临时目录中。

整个互斥体的内容与您编写的测试用例无关。仅当您从多个线程中并行创建计划时才需要它。您可以在代码的其他部分执行此操作,但不能在此处执行此操作。它不会损害测试用例,但也没有必要。

基准结果

如果我使用 128x128 大小的 FFT 和 100,000 次迭代来运行此程序,则 16 个线程在第一次运行时需要 19 秒(有计划),在以后的所有运行中需要 7 秒。一个线程最初需要 5.8 秒,使用缓存智慧则需要 4.6 秒。

如果我使用 1024x1024 大小的 FFT、16 线程、1000 次迭代运行此程序,则第一次运行时间为 3 分钟,第二次运行时间为 5 秒;使用所有线程。只有一个线程,第一次运行需要 34 秒,其他所有运行 9 秒。

总结

您的 FFT 大小可能太小,无法有效利用多线程。通过使用

fftw_plan_many_dft_r2c
进行批量独立的小型 FFT 或从外部
pragma omp parallel
部分调用许多独立的单线程 FFT,您将获得更好的结果

© www.soinside.com 2019 - 2024. All rights reserved.