当我在他们的文档中读到您应该创建所有 fftw 计划一次并执行多次时,我正在为 C/C++ 中 FFTW 的缓慢实现而苦苦挣扎,而我能够正确实现这一点。现在,我尝试将内置 openmp 功能与并行 FFTW 结合使用。我遵循了他们关于 fftw 线程安全的文档:
The only thread-safe routine in FFTW is fftw_execute.
All other routines (e.g. the planner) should only be called from one thread at a time.
So, for example, you can wrap a semaphore lock around any calls to the planner.
这就是我使用
std::lock_guard<std::mutex> lock(...
所做的
但是,现在的问题是我不知道如何或在哪里调用以下内容:
Third, before creating a plan that you want to parallelize, you should call:
void fftw_plan_with_nthreads(int nthreads);
我现在拥有的任何示例代码似乎都不起作用或无法“并行化”!
代码示例:
#include"omp.h"
#include <mutex>
#include <thread>
#include "fftw3.h"
static const int nx = 128;
static const int ny = 128;
static const int ncomp = 2;
static const int nyk = ny/2 + 1;
class MyFftwClass {
public:
MyFftwClass(void);
~MyFftwClass(void);
void execute(double rArr[], double cArr[][ncomp]);
private:
static fftw_plan s_plan; // <-- shared by all instances
double *m_buffer_in;
fftw_complex *m_buffer_out;
};
class MyFftwClass1 {
public:
MyFftwClass1(void);
~MyFftwClass1(void);
void execute1(double cArr[][ncomp],double rArr[]);
private:
static fftw_plan s_plan1; // <-- shared by all instances
fftw_complex *m_buffer_in1;
double *m_buffer_out1;
};
int main(){
fftw_init_threads(); //before calling any FFTW routines, you should call the function
//This function, which should only be called once (probably in your main() function), performs any one-time initialization required to use threads on your system.
int nThreads =1;//4;
omp_set_num_threads(nThreads);
double *Function;
Function= (double*) fftw_malloc(nx*ny*sizeof(double));
for (int i = 0; i < nx; i++){
for (int j = 0; j < ny; j++){
Function[j + ny*i] = //some initialization;
}
}
fftw_complex *Functionk;
Functionk= (fftw_complex*) fftw_malloc(nx*nyk*sizeof(fftw_complex));
memset(Functionk, 42, nx*nyk* sizeof(fftw_complex));
MyFftwClass r2c1; //declare r2c1 of type MyFftwClass
r2c1.execute(Function,Functionk);
double *Out;
Out = (double*) fftw_malloc(nx*ny*sizeof(double));
memset(Out, 42, nx*ny* sizeof(double));
MyFftwClass1 c2r1; //declare r2c1 of type MyFftwClass
c2r1.execute1(Functionk,Out);
//fftw_free stuff
}
std::mutex g_my_fftw_mutex; // <-- must lock before using any fftw_*() function, except for fftw_execute*()
MyFftwClass1::MyFftwClass1(void)
{
// serialize the initialization!
std::lock_guard<std::mutex> lock(g_my_fftw_mutex);
// allocate separate buffers for each instance
m_buffer_in1 = fftw_alloc_complex(nx * nyk);
m_buffer_out1 = fftw_alloc_real(nx * ny);
if (!(m_buffer_in1 && m_buffer_out1))
{
throw new std::runtime_error("Failed to allocate memory!");
}
// create plan *once* for all instances/threads
if (!s_plan1)
{
s_plan1 = fftw_plan_dft_c2r_2d(nx, ny, m_buffer_in1, m_buffer_out1, FFTW_PATIENT);
if (!s_plan1)
{
throw new std::runtime_error("Failed to create plan!");
}
}
}
MyFftwClass1::~MyFftwClass1(void)
{
std::lock_guard<std::mutex> lock(g_my_fftw_mutex);
fftw_destroy_plan(s_plan1);
fftw_free(m_buffer_in1);
fftw_free(m_buffer_out1);
}
void MyFftwClass1::execute1(double cArr[][ncomp],double rArr[]) //void MyFftwClass::execute(double *const in, fftw_complex *const out)
{
// No serialization is needed here
memcpy(m_buffer_in1,cArr, sizeof(fftw_complex) * nx*(nyk));
fftw_execute_dft_c2r(s_plan1, m_buffer_in1, m_buffer_out1); //instead of fftw_excute(plan)
memcpy(rArr, m_buffer_out1,sizeof(double) * nx*ny);
//(rArr, 1.0 / (nx*ny), rArr); //renormalize
}
fftw_plan MyFftwClass1::s_plan1 = NULL;
std::mutex g_my_fftw_mutex; // <-- must lock before using any fftw_*() function, except for fftw_execute*()
MyFftwClass::MyFftwClass(void)
//int r2cfft_initialize(r2cfft_t *const ctx, const std::size_t nx, const std::size_t ny)
{
// serialize the initialization!
std::lock_guard<std::mutex> lock(g_my_fftw_mutex);
//allocating buffers first before plan creating gets rid off seg fault error
// allocate separate buffers for each instance
m_buffer_in = fftw_alloc_real(nx * ny);
m_buffer_out = fftw_alloc_complex(nx * nyk);
if (!(m_buffer_in && m_buffer_out))
{
throw new std::runtime_error("Failed to allocate memory!");
}
// create plan *once* for all instances/threads
if (!s_plan)
{
s_plan = fftw_plan_dft_r2c_2d(nx, ny, m_buffer_in, m_buffer_out, FFTW_PATIENT);
if (!s_plan)
{
throw new std::runtime_error("Failed to create plan!");
}
}
}
MyFftwClass::~MyFftwClass(void)
{
std::lock_guard<std::mutex> lock(g_my_fftw_mutex);
fftw_free(m_buffer_in);
fftw_free(m_buffer_out);
fftw_destroy_plan(s_plan);
}
void MyFftwClass::execute(double rArr[], double cArr[][ncomp]) //void MyFftwClass::execute(double *const in, fftw_complex *const out)
{
// No serialization is needed here
memcpy(m_buffer_in, rArr, sizeof(double) * nx*ny);
fftw_execute_dft_r2c(s_plan, m_buffer_in, m_buffer_out); //instead of fftw_excute(plan)
memcpy(cArr, m_buffer_out, sizeof(fftw_complex) * nx*(nyk));
}
fftw_plan MyFftwClass::s_plan = NULL;
这看起来是线程安全的和/或并行的吗?谢谢!
有两件事:
您与“FFTW_PATIENT”一起制定计划。这是非常昂贵的,对于线程来说更是如此。您需要经常重复使用计划,进行很多很多转换;或者或者你需要保存计划的智慧,这样成本只产生一次
你的FFT相当小。可能无法并行化
这是一个经过最小修改的版本。还有很多我认为已经过时的做法,或者可以做得更好的做法。但让我们首先关注这一点:
int main(){
fftw_init_threads();
fftw_import_system_wisdom();
fftw_import_wisdom_from_filename("/var/tmp/my_application_fftw.wisdom");
int nThreads =1;//4;
omp_set_num_threads(nThreads);
fftw_plan_with_nthreads(nThreads);
double *Function;
Function= (double*) fftw_malloc(nx*ny*sizeof(double));
for (int i = 0; i < nx; i++){
for (int j = 0; j < ny; j++){
Function[j + ny*i] = 1.;//some initialization;
}
}
fftw_complex *Functionk;
Functionk= (fftw_complex*) fftw_malloc(nx*nyk*sizeof(fftw_complex));
memset(Functionk, 42, nx*nyk* sizeof(fftw_complex));
double *Out;
Out = (double*) fftw_malloc(nx*ny*sizeof(double));
memset(Out, 42, nx*ny* sizeof(double));
MyFftwClass r2c1; //declare r2c1 of type MyFftwClass
MyFftwClass1 c2r1; //declare r2c1 of type MyFftwClass
for(int i = 0; i < 100000; ++i) {
r2c1.execute(Function,Functionk);
c2r1.execute1(Functionk,Out);
}
fftw_export_wisdom_to_filename("/var/tmp/my_application_fftw.wisdom");
//fftw_free stuff
}
使用
g++ -O3 -lfftw -lfftw_omp -fopenmp
进行编译(如果 -lfftw_threads
不可用,则使用 fftw_omp
)
注意我如何重复使用相同的两个计划 100,000 次。您实际上需要利用这些计划。我们在应用程序调用之间缓存规划结果。最终的应用程序需要更多地考虑要将该文件放在哪里。例如,在集群上,它应该位于您的主目录中,而不是某些计算节点的临时目录中。
整个互斥体的内容与您编写的测试用例无关。仅当您从多个线程中并行创建计划时才需要它。您可以在代码的其他部分执行此操作,但不能在此处执行此操作。它不会损害测试用例,但也没有必要。
如果我使用 128x128 大小的 FFT 和 100,000 次迭代来运行此程序,则 16 个线程在第一次运行时需要 19 秒(有计划),在以后的所有运行中需要 7 秒。一个线程最初需要 5.8 秒,使用缓存智慧则需要 4.6 秒。
如果我使用 1024x1024 大小的 FFT、16 线程、1000 次迭代运行此程序,则第一次运行时间为 3 分钟,第二次运行时间为 5 秒;使用所有线程。只有一个线程,第一次运行需要 34 秒,其他所有运行 9 秒。
fftw_plan_many_dft_r2c
进行批量独立的小型 FFT 或从外部 pragma omp parallel
部分调用许多独立的单线程 FFT,您将获得更好的结果