如何优化 boost 堆以在堆操作中胜过 std multiset? C++

问题描述 投票:0回答:1

我一直致力于实施快速行进方法。它是求解一类特殊微分方程的一种计算方法。特别是,此代码求解方程 $$| 阿布拉披| = 1$$ 对于 $$\phi$$ 给定边界条件 $$\phi = 0$$。无论如何,要实现 O(n lg n) 运行时间,需要使用支持 get_min()、extract_min()、decrease_key() 和 insert()(或 push())的优先级队列。此外,所讨论的算法将使用 n 个 insert() 操作、n 个 extract_min() 操作和最坏的 4n 个 decrease_key() 操作。现在,在我看来,像 boost 库中的 fibonacci_heap 这样的堆会比支持相同操作的 std set 好很多(减少键是通过擦除元素并读取它来实现的)。然而,事实并非如此,我想知道为什么?

(我想指出,无法使用标准优先级队列,因为它不支持 decrease_key())

这里是使用 d-ary-heap 的代码:

#include <vector> 
#include <limits>
#include <set>
#include <tuple>
#include <iostream>
#include <cmath>
#include <cassert>
#include <boost/heap/d_ary_heap.hpp>

using namespace boost::heap;

// Define epsilon value
#define EPS 0.0000000001

using namespace boost::heap;

struct treedist {
    double d;
    int row;
    int col;
    int ts;
    int known;
    bool operator<(const treedist& rhs) const
    {
        if (d == rhs.d) {
            if (known == rhs.known) {
                return ts > rhs.ts;
            }
            return known < rhs.known;
        }
        return d > rhs.d;
    }
};

struct fmmdist {
    int state;
    double d = -1;
    typename d_ary_heap<treedist,mutable_<true>,arity<2> >::handle_type it;
};

// Matrix representing state of point in fast marching method
std::vector<std::vector <fmmdist> > V;

// Binary tree used to efficiently store the distances
d_ary_heap<treedist,mutable_<true>,arity<2> > distances;

// phi array
std::vector<std::vector<double> > phi;

// Size of grid
int rows; int columns;

// Spatial step
double ddx;

const int dx[4] = {0,1,0,-1};
const int dy[4] = {1,0,-1,0};

// initialize phi array for testing purposes
void initPhi() {
    ddx = 0.001;
    rows = 16001;
    columns = 16001;
    for (int i = 0; i < rows; i++ ) {
        std::vector<double> temp;
        phi.push_back(temp);
        for (int j = 0; j < columns; j++) {
            phi[i].push_back(sqrt(powf(-8 + ddx*i,2) + powf(-8 + ddx*j,2)) - 4);
        }
    }
}

// Initialize the state array
void initState() {
    // 0 means far, 1 means close, 2 means known
    for (int i = 0; i < rows; i++) {
        std::vector<fmmdist> temp;
        V.push_back(temp);
        for (int j = 0; j < columns; j++) {
            struct fmmdist f;
            f.state = 0;
            V[i].push_back(f);
        }
    }
}

// return largest root of quadratic with coef. a, b, c
// This assumes delta >= 0 (this holds for FMM method)
double quadratic(double a, double b, double c) {
    double delta = b * b - 4 * a * c;
    if ( delta > 0 ) {
        double r1 = (-b + std::sqrtf(delta))/(2*a);
        double r2 = (-b - std::sqrtf(delta))/(2*a);
        if (r1 > r2) {
            return r1;
        } 
        return r2;
    }
    return -b/(2*a);
}

// Initialization of the fast marching method
void initialization(bool inside) {
    // 0 means far, 1 means close, 2 means known
    // inside means we tag the points within the interface as known
    for (int i = 0; i < rows; i++) {
        for (int j = 0; j < columns; j++) {
            if (inside) {
                if (phi[i][j] < EPS) { // bound used to be used here...
                    V[i][j].state = 2;
                    V[i][j].d = 0;
                } 
            } else {
                if (phi[i][j] > -EPS) {
                    V[i][j].state = 2;
                    V[i][j].d = 0;
                }
            }
        }
    }
    // find initial close points
    // see paper by sethian
    for (int i = 0; i < rows; i++) {
        for (int j = 0; j < columns; j++) {
            if (V[i][j].state != 2) {
                int neigh = 0; // mod 16 this represents a specific configuration
                double dist = 0;
                int known = 0;
                std::vector<double> ndist;
                for (int k = 0; k < 4; k++) {
                    if (i + dx[k] >= rows || i + dx[k] < 0 || j + dy[k] >= columns || j + dy[k] < 0) continue;
                    if (V[i + dx[k]][j + dy[k]].state == 2) {
                        neigh += pow(2,k);
                        known++;
                        ndist.push_back(abs(phi[i][j]) * ddx/(abs(phi[i][j]) + abs(phi[i + dx[k]][j + dy[k]])) );
                        
                    } 
                }
                if (ndist.size() == 1) {
                    // case a
                    dist = ndist[0];
                } else if (neigh == 3 || neigh == 12 || neigh == 6 || neigh == 9) {
                    // case b
                    dist = quadratic(2,0,-(ndist[0] * ndist[0] * ndist[1] * ndist[1])/(ndist[0] * ndist[0] + ndist[1] * ndist[1]));
                } else if (neigh == 5 || neigh == 10) {
                    // case d
                    dist = fmin(ndist[0],ndist[1]);
                } else if (neigh == 13) {
                    // case c, both vertical
                    double v = fmin(ndist[0],ndist[1]);
                    dist = quadratic(2,0,-(ndist[2] * ndist[2] * v * v)/(ndist[2] * ndist[2] + v*v));
                } else if (neigh == 7) {
                    // case c, both vertical
                    double v = fmin(ndist[0],ndist[2]);
                    dist = quadratic(2,0,-(ndist[1] * ndist[1] * v * v)/(ndist[1] * ndist[1] + v*v));
                } else if (neigh == 11) {
                    // case c, both horizontal
                    double v = fmin(ndist[1],ndist[2]);
                    dist = quadratic(2,0,-(ndist[0] * ndist[0] * v * v)/(ndist[0] * ndist[0] + v*v));
                } else if (neigh == 14) {
                    // case c, both horizontal
                    double v = fmin(ndist[0],ndist[2]);
                    dist = quadratic(2,0,-(ndist[1] * ndist[1] * v * v)/(ndist[1] * ndist[1] + v*v));
                } else if (neigh == 15) {
                    // case e
                    double v = fmin(ndist[0],ndist[2]);
                    double h = fmin(ndist[1],ndist[3]);
                    dist = quadratic(2,0,-(h * h * v * v)/(h * h + v*v));
                }
                if (neigh > 0) {
                    // add to narrow band
                    struct treedist t;
                    t.d = dist; t.row = i; t.col = j; t.ts = 0; t.known = known;
                    V[i][j].state = 1;
                    V[i][j].d = dist;
                    V[i][j].it = distances.push(t); 
                }
            }
        }
    }
}

// Determine x coefficients of backward/forward difference 
std::tuple<double, double, double> rowCoef(int row, int column, bool backward) {
    int row1; 
    if (backward) {
        row1 = row - 1; 
    } else {
        row1 = row + 1; 
    }
    return std::make_tuple(1.0, -2.0 * V[row1][column].d, V[row1][column].d * V[row1][column].d );
}

// Determine y coefficients of backward/forward difference 
std::tuple<double, double, double> columnCoef(int row, int column, bool backward) {
    int col1;
    if (backward) {
        col1 = column - 1; 
    } else {
        col1 = column + 1; 
    }
    return std::make_tuple(1.0, -2.0 * V[row][col1].d, V[row][col1].d * V[row][col1].d );       
}

double computeDist(int row, int column) {
    // Determine the x-coefficients
    std::tuple<double, double, double> coefx = std::make_tuple(0,0,0);
    if (row + 1 < rows && row - 1 >= 0 ) {
        if(V[row + 1][column].state == 2 && V[row - 1][column].state == 2) coefx = rowCoef(row, column, V[row - 1][column].d < V[row + 1][column].d);
        else if (V[row + 1][column].state == 2 ) coefx = rowCoef(row, column, false);
        else if (V[row - 1][column].state == 2) coefx = rowCoef(row, column, true);
    } else if ( row + 1 < rows) {
        if(V[row + 1][column].state == 2) coefx = rowCoef(row, column, false);
    } else if ( row - 1 >= 0) {
        if(V[row - 1][column].state == 2) coefx = rowCoef(row, column, true);
    }
    // Determine the y-coefficients
    std::tuple<double, double, double> coefy = std::make_tuple(0,0,0);
    if (column + 1 < columns && column - 1 >= 0) {
        if (V[row][column + 1].state == 2 && V[row][column - 1].state == 2) coefy = columnCoef(row, column, V[row][column - 1].d < V[row][column + 1].d);
        else if (V[row][column + 1].state == 2) coefy = columnCoef(row, column, false);
        else if (V[row][column - 1].state == 2 ) coefy = columnCoef(row, column, true);
    } else if ( column + 1 < columns) {
        if(V[row][column + 1].state == 2) coefy = columnCoef(row, column, false);
    } else if ( column - 1 >= 0){
        if(V[row][column - 1].state == 2) coefy = columnCoef(row, column, true);
    } 

    // return the largest root of the quadratic
    double a = std::get<0>(coefx) + std::get<0>(coefy);
    double b = std::get<1>(coefx) + std::get<1>(coefy);
    double c = std::get<2>(coefx) + std::get<2>(coefy) - powf(ddx,2);
    double result = quadratic(a,b,c);
    assert(!isnan(result));
    return result;
}

// Fast marching loop including simultaneous velocity extension
void loopFMVel() {
    int count = 1;
    while(distances.size() > 0) {
        // extract closest
        struct treedist temp = distances.top();
        int row = temp.row; int column = temp.col;
        distances.pop();
        V[row][column].state = 2;
        for (int k = 0; k < 4; k++) {
            if (row + dx[k] >= rows || row + dx[k] < 0 || column + dy[k] >= columns || column + dy[k] < 0) continue;
            if (V[row + dx[k]][column + dy[k]].state == 2) continue;
            double d;
            if (V[row + dx[k]][column + dy[k]].state == 1) {
                d = computeDist(row + dx[k], column + dy[k]);
                if (d < (*V[row + dx[k]][column + dy[k]].it).d) {
                    (*V[row + dx[k]][column + dy[k]].it).d = d;
                    (*V[row + dx[k]][column + dy[k]].it).known = (*V[row + dx[k]][column + dy[k]].it).known + 1;
                    V[row + dx[k]][column + dy[k]].d = d;
                    distances.increase(V[row + dx[k]][column + dy[k]].it);
                }
            } else if (V[row + dx[k]][column + dy[k]].state == 0) {
                d = computeDist(row + dx[k], column + dy[k]);
                struct treedist t;
                t.d = d; t.row = row + dx[k]; t.col = column + dy[k]; t.ts = count; t.known = 1;
                V[row + dx[k]][column + dy[k]].state = 1;
                V[row + dx[k]][column + dy[k]].d = d;
                V[row + dx[k]][column + dy[k]].it = distances.push(t);
            }
        }
        count++;
    }
}

int main() {
    distances.reserve(pow(2056,2)); 
    initPhi();
    initState();
    std::cout << "Finished state init " << std::endl;
    initialization(true);
    std::cout << "Finished init of FMM " << std::endl;
    loopFMVel();
    return 0;
}

这是使用多重集的代码:

#include <vector> 
#include <limits>
#include <set>
#include <tuple>
#include <iostream>
#include <cmath>
#include <cassert>

// Define epsilon value
#define EPS 0.0000000001

struct treedist {
    double d;
    int row;
    int col;
    int ts;
    int known;
    bool operator<(const treedist& rhs) const
    {
        if (d == rhs.d) {
            if (known == rhs.known) {
                return ts < rhs.ts;
            }
            return known > rhs.known;
        }
        return d < rhs.d;
    }
};

struct fmmdist {
    int state;
    double d = -1;
    std::multiset<treedist>::iterator it;
};

// Matrix representing state of point in fast marching method
std::vector<std::vector <fmmdist> > V;

// Binary tree used to efficiently store the distances
std::multiset<treedist> distances;

// phi array
std::vector<std::vector<double> > phi;

// Size of grid
int rows; int columns;

// Spatial step
double ddx;

const int dx[4] = {0,1,0,-1};
const int dy[4] = {1,0,-1,0};

// initialize phi array for testing purposes
void initPhi() {
    ddx = 0.001;
    rows = 16001;
    columns = 16001;
    for (int i = 0; i < rows; i++ ) {
        std::vector<double> temp;
        phi.push_back(temp);
        for (int j = 0; j < columns; j++) {
            phi[i].push_back(sqrt(powf(-8 + ddx*i,2) + powf(-8 + ddx*j,2)) - 4);
        }
    }
}

// Initialize the state array
void initState() {
    // 0 means far, 1 means close, 2 means known
    for (int i = 0; i < rows; i++) {
        std::vector<fmmdist> temp;
        V.push_back(temp);
        for (int j = 0; j < columns; j++) {
            struct fmmdist f;
            f.state = 0;
            V[i].push_back(f);
        }
    }
}

// return largest root of quadratic with coef. a, b, c
double quadratic(double a, double b, double c) {
    double delta = b * b - 4 * a * c;
    if ( delta > 0 ) {
        double r1 = (-b + std::sqrtf(delta))/(2*a);
        double r2 = (-b - std::sqrtf(delta))/(2*a);
        if (r1 > r2) {
            return r1;
        } 
        return r2;
    }
    return -b/(2*a);
}

// Initialization of the fast marching method
void initialization(bool inside) {
    // 0 means far, 1 means close, 2 means known
    // inside means we tag the points within the interface as known
    for (int i = 0; i < rows; i++) {
        for (int j = 0; j < columns; j++) {
            if (inside) {
                if (phi[i][j] < EPS) { // bound used to be used here...
                    V[i][j].state = 2;
                    V[i][j].d = 0;
                } 
            } else {
                if (phi[i][j] > -EPS) {
                    V[i][j].state = 2;
                    V[i][j].d = 0;
                }
            }
        }
    }
    // find initial close points
    // see paper by sethian
    for (int i = 0; i < rows; i++) {
        for (int j = 0; j < columns; j++) {
            if (V[i][j].state != 2) {
                int neigh = 0; // mod 16 this represents a specific configuration
                double dist = 0;
                int known = 0;
                std::vector<double> ndist;
                for (int k = 0; k < 4; k++) {
                    if (i + dx[k] >= rows || i + dx[k] < 0 || j + dy[k] >= columns || j + dy[k] < 0) continue;
                    if (V[i + dx[k]][j + dy[k]].state == 2) {
                        neigh += pow(2,k);
                        known++;
                        ndist.push_back(abs(phi[i][j]) * ddx/(abs(phi[i][j]) + abs(phi[i + dx[k]][j + dy[k]])) );
                        
                    } 
                }
                if (ndist.size() == 1) {
                    // case a
                    dist = ndist[0];
                } else if (neigh == 3 || neigh == 12 || neigh == 6 || neigh == 9) {
                    // case b
                    dist = quadratic(2,0,-(ndist[0] * ndist[0] * ndist[1] * ndist[1])/(ndist[0] * ndist[0] + ndist[1] * ndist[1]));
                } else if (neigh == 5 || neigh == 10) {
                    // case d
                    dist = fmin(ndist[0],ndist[1]);
                } else if (neigh == 13) {
                    // case c, both vertical
                    double v = fmin(ndist[0],ndist[1]);
                    dist = quadratic(2,0,-(ndist[2] * ndist[2] * v * v)/(ndist[2] * ndist[2] + v*v));
                } else if (neigh == 7) {
                    // case c, both vertical
                    double v = fmin(ndist[0],ndist[2]);
                    dist = quadratic(2,0,-(ndist[1] * ndist[1] * v * v)/(ndist[1] * ndist[1] + v*v));
                } else if (neigh == 11) {
                    // case c, both horizontal
                    double v = fmin(ndist[1],ndist[2]);
                    dist = quadratic(2,0,-(ndist[0] * ndist[0] * v * v)/(ndist[0] * ndist[0] + v*v));
                } else if (neigh == 14) {
                    // case c, both horizontal
                    double v = fmin(ndist[0],ndist[2]);
                    dist = quadratic(2,0,-(ndist[1] * ndist[1] * v * v)/(ndist[1] * ndist[1] + v*v));
                } else if (neigh == 15) {
                    // case e
                    double v = fmin(ndist[0],ndist[2]);
                    double h = fmin(ndist[1],ndist[3]);
                    dist = quadratic(2,0,-(h * h * v * v)/(h * h + v*v));
                }
                if (neigh > 0) {
                    // add to narrow band
                    struct treedist t;
                    t.d = dist; t.row = i; t.col = j; t.ts = 0; t.known = known;
                    V[i][j].state = 1;
                    V[i][j].d = dist;
                    V[i][j].it = distances.insert(t); 
                }
            }
        }
    }
}

// Determine x coefficients of backward/forward difference 
std::tuple<double, double, double> rowCoef(int row, int column, bool backward) {
    int row1; 
    if (backward) {
        row1 = row - 1; 
    } else {
        row1 = row + 1;
    }
    return std::make_tuple(1.0, -2.0 * V[row1][column].d, V[row1][column].d * V[row1][column].d );
}

// Determine y coefficients of backward/forward difference 
std::tuple<double, double, double> columnCoef(int row, int column, bool backward) {
    int col1; 
    if (backward) {
        col1 = column - 1; 
    } else {
        col1 = column + 1; 
    }
    return std::make_tuple(1.0, -2.0 * V[row][col1].d, V[row][col1].d * V[row][col1].d );       
}


double computeDist(int row, int column) {
    // Determine the x-coefficients
    std::tuple<double, double, double> coefx = std::make_tuple(0,0,0);
    if (row + 1 < rows && row - 1 >= 0 ) {
        if(V[row + 1][column].state == 2 && V[row - 1][column].state == 2) coefx = rowCoef(row, column, V[row - 1][column].d < V[row + 1][column].d);
        else if (V[row + 1][column].state == 2 ) coefx = rowCoef(row, column, false);
        else if (V[row - 1][column].state == 2) coefx = rowCoef(row, column, true);
    } else if ( row + 1 < rows) {
        if(V[row + 1][column].state == 2) coefx = rowCoef(row, column, false);
    } else if ( row - 1 >= 0) {
        if(V[row - 1][column].state == 2) coefx = rowCoef(row, column, true);
    }
    // Determine the y-coefficients
    std::tuple<double, double, double> coefy = std::make_tuple(0,0,0);
    if (column + 1 < columns && column - 1 >= 0) {
        if (V[row][column + 1].state == 2 && V[row][column - 1].state == 2) coefy = columnCoef(row, column, V[row][column - 1].d < V[row][column + 1].d);
        else if (V[row][column + 1].state == 2) coefy = columnCoef(row, column, false);
        else if (V[row][column - 1].state == 2 ) coefy = columnCoef(row, column, true);
    } else if ( column + 1 < columns) {
        if(V[row][column + 1].state == 2) coefy = columnCoef(row, column, false);
    } else if ( column - 1 >= 0){
        if(V[row][column - 1].state == 2) coefy = columnCoef(row, column, true);
    } 
    // return the largest root of the quadratic
    double a = std::get<0>(coefx) + std::get<0>(coefy);
    double b = std::get<1>(coefx) + std::get<1>(coefy);
    double c = std::get<2>(coefx) + std::get<2>(coefy) - powf(ddx,2);

    double result = quadratic(a,b,c);
    return result;
}

// Fast marching loop i
void loopFMVel() {
    int count = 1;
    while(distances.size() > 0) {
        // extract closest
        std::multiset<treedist>::iterator it = distances.begin();
        int row = (*it).row; int column = (*it).col;
        distances.erase(it);
        V[row][column].state = 2;
        for (int k = 0; k < 4; k++) {
            if (row + dx[k] >= rows || row + dx[k] < 0 || column + dy[k] >= columns || column + dy[k] < 0) continue;
            if (V[row + dx[k]][column + dy[k]].state == 2) continue;
            double d;
            if (V[row + dx[k]][column + dy[k]].state == 1) {
                d = computeDist(row + dx[k], column + dy[k]);
                struct treedist t;
                t.d = (*V[row + dx[k]][column + dy[k]].it).d; t.known = (*V[row + dx[k]][column + dy[k]].it).known;
                t.known++;
                t.row = row + dx[k]; t.col = column + dy[k]; t.ts = (*V[row + dx[k]][column + dy[k]].it).ts;
                if (d < t.d) {
                    t.d = d;
                    V[row + dx[k]][column + dy[k]].d = d;
                }
                distances.erase(V[row + dx[k]][column + dy[k]].it);
                V[row + dx[k]][column + dy[k]].it = distances.insert(t);
            } else if (V[row + dx[k]][column + dy[k]].state == 0) {
                d = computeDist(row + dx[k], column + dy[k]);
                struct treedist t;
                t.d = d; t.row = row + dx[k]; t.col = column + dy[k]; t.ts = count; t.known = 1;
                V[row + dx[k]][column + dy[k]].state = 1;
                V[row + dx[k]][column + dy[k]].d = d;
                V[row + dx[k]][column + dy[k]].it = distances.insert(t);
            }
        }
        count++;
    }
    
    std::cout << "Finished with size " << distances.size() << std::endl;
}

int main() {
    initPhi();
    initState();
    std::cout << "Finished state init " << std::endl;
    initialization(true);
    std::cout << "Finished init of FMM " << std::endl;
    loopFMVel();
    return 0;
}

以下是我测试的一些结果:

N = 16001^2(我在带有标志 -Ofast -fno-finite-math-only -march=armv8.5-a -mcpu=native -ffast-math 的 M1 max 上运行了这些测试) binary_heap(保留内存):58.35s 多组:63.33s fibonacci_heap:73.43 16_ary_heap(保留内存):65.40s 配对堆:116.5s

我使用时间进行剖析。如果需要更多详细信息,我会很乐意添加它们。

performance c++11 boost binary-search-tree heap
1个回答
0
投票

我花了过多的时间重构代码,以便我能够真正理解它。

风格/性能

通读时我做了一些修改

  • 避免重复代码或子表达式(非常 容易出错,尤其是在复制粘贴的东西中)

  • 试图找到“有意义”的名字。

  • 移除魔法常量(如果你可以评论它们,你可以使用枚举)

  • 避免动态分配和增量插入。除非你定义

    USE_VECTOR
    我的版本使用
    std::array
    预先分配所有内容:

     #ifndef USE_VECTOR
         template <typename T, size_t C> using Row              = std::array<T, C>;
         template <typename T, size_t R, size_t C> using Matrix = std::array<Row<T, C>, R>;
     #else
         template <typename T, size_t R, size_t C> struct Matrix {
             std::vector<std::vector<T>> impl_;
             Matrix() : impl_(R, std::vector<T>(C, T{})) {}
    
             auto& operator[](size_t n) const { return impl_[n]; }
             auto& operator[](size_t n)       { return impl_[n]; }
         };
     #endif
    

    同时用于

    phi
    V

     // phi array
     Matrix<double, rows, columns> phi;
    
     // Matrix representing state of point in fast marching method
     Matrix<fmmdist, rows, columns> V;
    
  • 避免像

    sqrtf
    这样的C-ism,更喜欢
    std::min
    而不是手动比较和交换

  • 我重写了

    initialization
    中的分支以始终使用相同的路径进行
    quadratic
    计算:

     double dist = pick_distance(ndist, neigh);
    

    实现(也使用

    static_vector<double, 4>
    最大 4 个相邻距离):

     using NDist = boost::container::static_vector<double, 4>;
    
     static double pick_distance(NDist const& ndist, unsigned configuration)  {
         if (ndist.size() == 1)
             return ndist[0]; // case a
    
         auto q_hv = [](double h, double v) {
             return quadratic(2, 0, -(h * h * v * v) / (h * h + v * v));
         };
    
         switch (configuration) {
             case 3: case 12:
             case 6: case 9:  return q_hv(ndist[0], ndist[1]);                     // case b
             case 5: case 10: return std::min(ndist[0], ndist[1]);                 // case d
             case 13:         return q_hv(ndist[2], std::min(ndist[0], ndist[1])); // case c, both vertical
             case 7:          return q_hv(ndist[1], std::min(ndist[0], ndist[2])); // case c, both vertical
             case 11:         return q_hv(ndist[0], std::min(ndist[1], ndist[2])); // case c, both horizontal
             case 14:         return q_hv(ndist[1], std::min(ndist[0], ndist[2])); // case c, both horizontal
             case 15:         return q_hv(std::min(ndist[1], ndist[3]), std::min(ndist[0], ndist[2])); // case e
             default:         return 0;
         }
     };
    
  • 将位添加到

    neigh
    位掩码(顺便说一句,这是一个聪明的技巧)可以更安全、更有效地完成:

     neigh |= (1 << k); // instead of neigh += pow(2,k);
    
  • 各种变量

    known
    ndist
    都有相当冗余的状态,遵守以下不变量:

     assert(known >= 0);
     assert(std::popcount(neigh) == known);
     assert(known == static_cast<int>(ndist.size()));
     assert((neigh != 0) == (known != 0));
     assert((neigh == 0) == ndist.empty());
    

    我建议删除一些未使用的值。有了这些,并提取重复的子表达式,

    initialization
    函数变得更具可读性:

     // Initialization of the fast marching method
     void initialization(bool inside) {
         // inside means we tag the points within the interface as known
         for (int i = 0; i < rows; i++)
             for (int j = 0; j < columns; j++)
                 if (inside != (phi[i][j] >= EPS)) { // bound used to be used here...
                     V[i][j].state = fmmdist::known;
                     V[i][j].dist  = 0;
                 }
    
         // find initial close points
         // see paper by James Sethian
         NDist ndist;
         for (int i = 0; i < rows; i++) {
             for (int j = 0; j < columns; j++) {
                 if (V[i][j].state != fmmdist::known) {
                     unsigned neigh = 0; // mod 16 this represents a specific configuration
    
                     ndist.clear();
                     for (unsigned k = 0; k < dx.size(); k++) {
                         auto const r = i + dx[k];
                         auto const c = j + dy[k];
                         if (r >= rows || r < 0 || c >= columns || c < 0)
                             continue;
    
                         if (V[r][c].state == fmmdist::known) {
                             neigh |= (1 << k);
    
                             using std::abs;
                             ndist.push_back(abs(phi[i][j]) * ddx / (abs(phi[i][j]) + abs(phi[r][c])));
                         }
                     }
    
                     if (neigh) {
                         double dist = pick_distance(ndist, neigh);
    
                         // add to narrow band
                         treedist t;
                         t.dist  = dist;
                         t.coord = {i, j};
                         t.ts    = 0;
                         t.known = ndist.size();
                         V[i][j] = {fmmdist::close, dist, queue.push(t)};
                     }
                 }
             }
         }
     }
    
  • 类似的处理

    loopFMVel()
    函数也有类似的效果:

     // Fast marching loop including simultaneous velocity extension
     void loopFMVel() {
         Timestamp tsclock = 1;
    
         while (!queue.empty()) {
             auto [row, column]   = queue.pop();
             V[row][column].state = fmmdist::known;
    
             for (unsigned k = 0; k < dx.size(); k++) {
                 auto const r = row + dx[k];
                 auto const c = column + dy[k];
                 if (r >= rows || r < 0 || c >= columns || c < 0)
                     continue;
    
                 auto& cell = V[r][c]; // TODO better name
                 if (cell.state == fmmdist::known)
                     continue;
    
                 auto dist = computeDist(r, c);
                 if (cell.state == fmmdist::close) {
                     if (auto& old = *cell.handle; dist < old.dist) {
                         cell.dist = dist;
                         queue.increase(cell.handle, dist);
                     }
                 } else if (cell.state == fmmdist::far) {
                     treedist temp;
                     temp.coord = {r, c};
                     temp.dist  = dist;
                     temp.known = 1;
                     temp.ts    = tsclock;
    
                     cell.state  = fmmdist::close;
                     cell.dist   = dist;
                     cell.handle = queue.push(temp);
                 }
             }
             tsclock += 1;
         }
     }
    

问题

一路上我发现了一些潜在的问题:

  1. 不小心将浮点数截断为整数

    的意思是在第129行使用

    int ::abs(int)
    吗?使用
    std::abs
    修复它以获得
    double std::abs(double)

    using std::abs;
    ndist.push_back(
        abs(phi[i][j]) * ddx /
        (abs(phi[i][j]) + abs(phi[i + dx[k]][j + dy[k]])));
    
  2. 不一致

    computeDist
    。经过我的审查,该功能现在显示为:

    struct Coef {
        double      a, b, c;
        static Coef make(double d) { return {1.0, -2.0 * d, d * d}; };
    };
    
    double computeDist(int row, int column) {
        // Determine the x-coefficients
        auto closest = [](fmmdist const* next, fmmdist const* prev) {
            if (next && next->state != fmmdist::known) next = nullptr;
            if (prev && prev->state != fmmdist::known) prev = nullptr;
    
            if (!(next || prev))
                return Coef{0, 0, 0}; // TODO is this okay? might need to be Coef::make(INF)?
    
            double d = std::numeric_limits<double>::infinity();
            if (next) d = std::min(d, next->dist);
            if (prev) d = std::min(d, prev->dist);
            return Coef::make(d);
        };
    
        Coef x = closest(                                   //
            row + 1 < rows ? &V[row + 1][column] : nullptr, //
            row > 0        ? &V[row - 1][column] : nullptr);
        Coef y = closest(                                         //
            column + 1 < columns ? &V[row][column + 1] : nullptr, //
            column > 0           ? &V[row][column - 1] : nullptr);
    
        // return the largest root of the quadratic
        double a = x.a + y.a;
        double b = x.b + y.b;
        double c = x.c + y.c - powf(ddx, 2);
    
        double result = quadratic(a, b, c);
        assert(!std::isnan(result));
        return result;
    }
    

    在这种形式中,

    rowCoef
    columnCoef
    之间的区别消失了。并强调了一个潜在的问题:

    if (!(next || prev))
        return Coef{0, 0, 0}; // TODO is this okay? might need to be Coef::make(INF)?
    

    如果没有相邻的行/列可访问并且在

    known
    中,我们默认为突然不同的系数
    (0, 0, 0)
    ,我可能期望
    (2, -Inf, Inf)
    的一致性。如果您同意,我们可以删除特殊情况,使代码更加优雅:

    auto closest = [](auto*... candidates) {
        double dist = INFINITY;
    
        for (auto p : {candidates...})
            if (p && p->state == fmmdist::known)
                dist = std::min(dist, p->dist);
    
        return Coef{1.0, -2.0 * dist, dist * dist};
    };
    
  3. 比较你的版本,我注意到多集变体的行为不同:它always增加

    known
    不管距离是否减少。

    正如您在上面看到的,我假设您只想像在堆实现中一样进行更新,以便进行公平比较。

队列实现

我重构了代码,因此我可以使用相同的 FMM 实现切换队列实现:

struct QueueConcept {
    using Handle = /*stable handle type*/;

    size_t size()  const;
    bool   empty() const;

    Coord pop(); // extract closest
    Handle push(treedist v);

    void increase(Handle& h, double dist);
};

这对于

multiset
d_ary_heap
都是简单的实现。

struct DistanceTree {
    using Storage = std::multiset<treedist, treedist::Less>;
    using Handle  = typename Storage::const_iterator; // must be stable

    size_t size()  const { return _storage.size();  }
    bool   empty() const { return _storage.empty(); }

    Coord pop() { // extract closest
        Handle handle = _storage.begin();
        Coord  coord  = handle->coord;
        _storage.erase(handle);
        return coord;
    }

    Handle push(treedist v) { return _storage.insert(std::move(v)); }

    void increase(Handle& h, double dist) {
        treedist v = *h;
        _storage.erase(h);

        assert(dist <= v.dist);
        v.dist = dist;
        v.known += 1;

        h = push(std::move(v));
    }

  private:
    Storage _storage;
};

struct DistanceHeap {
    using Storage = bh::d_ary_heap<treedist, //
                                   bh::compare<treedist::Greater>, bh::mutable_<true>, bh::arity<2>>;
    using Handle  = Storage::handle_type;

    DistanceHeap() { _storage.reserve(2056 * 2056); }

    size_t size()  const { return _storage.size();  }
    bool   empty() const { return _storage.empty(); }

    Coord pop() { // extract closest
        auto& temp  = _storage.top();
        Coord  coord = temp.coord;
        _storage.pop();
        return coord;
    }

    Handle push(treedist v) { return _storage.push(std::move(v)); }

    void increase(Handle h, double dist) {
        auto& v = *h;
        assert(dist <= v.dist);
        v.dist = dist;
        v.known += 1;
        _storage.increase(h);
    }

  private:
    Storage _storage;
};

现在我像这样模板化

Algorithm
代码:

template <typename Queue, int rows = 16001, int columns = rows> //
struct Algorithm {
    Queue queue; // prio queue used to efficiently store the distances

    // .. the rest
};

并添加了一个测试台:

#include <chrono>
#include <iomanip>
namespace {
    static long elapsed() {
        auto now = std::chrono::high_resolution_clock::now;
        using namespace std::chrono_literals;
        static auto start = now();
        auto        t     = now();

        return (t - std::exchange(start, t)) / 1ms;
    }

    void trace(auto const&... args) {
        ((std::cout << std::setw(10) << elapsed() << "ms ") << ... << args) << std::endl;
    }

    template <typename Queue> void run() {
        trace(__PRETTY_FUNCTION__);

        auto a = std::make_unique<Algorithm<Queue, 5601>>();
        trace("Constructed");
        a->initPhi();
        a->initState();
        trace("Finished state init ");
        a->initialization(true);
        trace("Finished init of FMM ");
        a->loopFMVel();
        trace("Done");
    }
} // namespace

int main(int argc, char**) {
    std::ios::sync_with_stdio(false);
    trace("Start");
    if (argc == 1) run<DistanceTree>();
    if (argc >= 2) run<DistanceHeap>();
    if (argc == 3) run<DistanceTree>();
    trace("Exit");
}

现场演示

住在Coliru

#include <array>
#include <cassert>
#include <cmath>
#include <limits>
#include <memory>
#include <set>
#include <tuple>
#include <utility>
#include <vector>

#include <boost/container/static_vector.hpp>
#include <boost/heap/d_ary_heap.hpp>
namespace bh = boost::heap;

// Define epsilon value
static inline constexpr double EPS = 1e-10;

#ifndef USE_VECTOR
    template <typename T, size_t C> using Row              = std::array<T, C>;
    template <typename T, size_t R, size_t C> using Matrix = std::array<Row<T, C>, R>;
#else
    template <typename T, size_t R, size_t C> struct Matrix {
        std::vector<std::vector<T>> impl_;
        Matrix() : impl_(R, std::vector<T>(C, T{})) {}

        auto& operator[](size_t n) const { return impl_[n]; }
        auto& operator[](size_t n)       { return impl_[n]; }
    };
#endif

using Timestamp = unsigned long;

struct Coord { int row, col; };

struct treedist {
    double    dist;
    int       known;
    Timestamp ts;
    Coord     coord;

  private:
    constexpr auto key() const { // natural order by nearest, greatest known and earliest ts
        return std::tuple(dist, -known, ts);
    }

  public:
    struct Less {
        constexpr bool operator()(treedist const& a, treedist const& b) const { return a.key() < b.key(); }
    };
    struct Greater {
        constexpr bool operator()(treedist const& a, treedist const& b) const { return a.key() > b.key(); }
    };
};

struct DistanceTree {
    using Storage = std::multiset<treedist, treedist::Less>;
    using Handle  = typename Storage::const_iterator; // must be stable

    size_t size()  const { return _storage.size();  }
    bool   empty() const { return _storage.empty(); }

    Coord pop() { // extract closest
        Handle handle = _storage.begin();
        Coord  coord  = handle->coord;
        _storage.erase(handle);
        return coord;
    }

    Handle push(treedist v) { return _storage.insert(std::move(v)); }

    void increase(Handle& h, double dist) {
        treedist v = *h;
        _storage.erase(h);

        assert(dist <= v.dist);
        v.dist = dist;
        v.known += 1;

        h = push(std::move(v));
    }

  private:
    Storage _storage;
};

struct DistanceHeap {
    using Storage = bh::d_ary_heap<treedist, //
                                   bh::compare<treedist::Greater>, bh::mutable_<true>, bh::arity<2>>;
    using Handle  = Storage::handle_type;

    DistanceHeap() { _storage.reserve(2056 * 2056); }

    size_t size()  const { return _storage.size();  }
    bool   empty() const { return _storage.empty(); }

    Coord pop() { // extract closest
        auto& temp  = _storage.top();
        Coord  coord = temp.coord;
        _storage.pop();
        return coord;
    }

    Handle push(treedist v) { return _storage.push(std::move(v)); }

    void increase(Handle h, double dist) {
        auto& v = *h;
        assert(dist <= v.dist);
        v.dist = dist;
        v.known += 1;
        _storage.increase(h);
    }

  private:
    Storage _storage;
};

template <typename Queue, int rows = 16001, int columns = rows> //
struct Algorithm {
    Queue queue; // prio queue used to efficiently store the distances

    struct fmmdist {
        enum State { far = 0, close = 1, known = 2 };
        State                  state{far};
        double                 dist{-1};
        typename Queue::Handle handle{};
    };

    // phi array
    Matrix<double, rows, columns> phi;

    // Matrix representing state of point in fast marching method
    Matrix<fmmdist, rows, columns> V;

    // Spatial step
    static double constexpr ddx = 1e-3;

    static constexpr std::array<int, 4> dx{0, 1, 0, -1};
    static constexpr std::array<int, 4> dy{1, 0, -1, 0};

    // initialize phi array for testing purposes
    inline void initPhi() {
        for (int i = 0; i < rows; i++)
            for (int j = 0; j < columns; j++)
                phi[i][j] = sqrt(pow(-8 + ddx * i, 2) + pow(-8 + ddx * j, 2)) - 4;
    }

    // Initialize the state array
    void initState() {
        for (int i = 0; i < rows; i++)
            for (int j = 0; j < columns; j++)
                V[i][j] = {fmmdist::far, -1, {}};
    }

    // return largest root of quadratic with coef. a, b, c
    // This assumes delta >= 0 (this holds for FMM method)
    static double quadratic(double a, double b, double c) {
        double delta = b * b - 4 * a * c;
        if (delta > 0) {
            double r1 = (-b + std::sqrt(delta)) / (2 * a);
            double r2 = (-b - std::sqrt(delta)) / (2 * a);
            return std::max(r1, r2);
        }
        return -b / (2 * a);
    }

    using NDist = boost::container::static_vector<double, 4>;

    static double pick_distance(NDist const& ndist, unsigned configuration)  {
        if (ndist.size() == 1)
            return ndist[0]; // case a

        auto q_hv = [](double h, double v) {
            return quadratic(2, 0, -(h * h * v * v) / (h * h + v * v));
        };

        switch (configuration) {
            case 3: case 12:
            case 6: case 9:  return q_hv(ndist[0], ndist[1]);                     // case b
            case 5: case 10: return std::min(ndist[0], ndist[1]);                 // case d
            case 13:         return q_hv(ndist[2], std::min(ndist[0], ndist[1])); // case c, both vertical
            case 7:          return q_hv(ndist[1], std::min(ndist[0], ndist[2])); // case c, both vertical
            case 11:         return q_hv(ndist[0], std::min(ndist[1], ndist[2])); // case c, both horizontal
            case 14:         return q_hv(ndist[1], std::min(ndist[0], ndist[2])); // case c, both horizontal
            case 15:         return q_hv(std::min(ndist[1], ndist[3]), std::min(ndist[0], ndist[2])); // case e
            default:         return 0;
        }
    };

    // Initialization of the fast marching method
    void initialization(bool inside) {
        // inside means we tag the points within the interface as known
        for (int i = 0; i < rows; i++)
            for (int j = 0; j < columns; j++)
                if (inside != (phi[i][j] >= EPS)) { // bound used to be used here...
                    V[i][j].state = fmmdist::known;
                    V[i][j].dist  = 0;
                }

        // find initial close points
        // see paper by James Sethian
        NDist ndist;
        for (int i = 0; i < rows; i++) {
            for (int j = 0; j < columns; j++) {
                if (V[i][j].state != fmmdist::known) {
                    unsigned neigh = 0; // mod 16 this represents a specific configuration

                    ndist.clear();
                    for (unsigned k = 0; k < dx.size(); k++) {
                        auto const r = i + dx[k];
                        auto const c = j + dy[k];
                        if (r >= rows || r < 0 || c >= columns || c < 0)
                            continue;

                        if (V[r][c].state == fmmdist::known) {
                            neigh |= (1 << k);

                            using std::abs;
                            ndist.push_back(abs(phi[i][j]) * ddx / (abs(phi[i][j]) + abs(phi[r][c])));
                        }
                    }

                    if (neigh) {
                        double dist = pick_distance(ndist, neigh);

                        // add to narrow band
                        treedist t;
                        t.dist  = dist;
                        t.coord = {i, j};
                        t.ts    = 0;
                        t.known = ndist.size();
                        V[i][j] = {fmmdist::close, dist, queue.push(t)};
                    }
                }
            }
        }
    }

    struct Coef {
        double a, b, c;
    };

    double computeDist(int row, int column) {
        // Determine the x-coefficients
        auto closest = [](auto*... candidates) {
            double dist = INFINITY;

            for (auto p : {candidates...})
                if (p && p->state == fmmdist::known)
                    dist = std::min(dist, p->dist);

            return Coef{1.0, -2.0 * dist, dist * dist};
        };

        Coef x = closest(                                   //
            row + 1 < rows ? &V[row + 1][column] : nullptr, //
            row > 0        ? &V[row - 1][column] : nullptr);
        Coef y = closest(                                         //
            column + 1 < columns ? &V[row][column + 1] : nullptr, //
            column > 0           ? &V[row][column - 1] : nullptr);

        // return the largest root of the quadratic
        double a = x.a + y.a;
        double b = x.b + y.b;
        double c = x.c + y.c - powf(ddx, 2);

        double result = quadratic(a, b, c);
        assert(!std::isnan(result));
        return result;
    }

    // Fast marching loop including simultaneous velocity extension
    void loopFMVel() {
        Timestamp tsclock = 1;

        while (!queue.empty()) {
            auto [row, column]   = queue.pop();
            V[row][column].state = fmmdist::known;

            for (unsigned k = 0; k < dx.size(); k++) {
                auto const r = row + dx[k];
                auto const c = column + dy[k];
                if (r >= rows || r < 0 || c >= columns || c < 0)
                    continue;

                auto& cell = V[r][c]; // TODO better name
                if (cell.state == fmmdist::known)
                    continue;

                auto dist = computeDist(r, c);
                if (cell.state == fmmdist::close) {
                    if (auto& old = *cell.handle; dist < old.dist) {
                        cell.dist = dist;
                        queue.increase(cell.handle, dist);
                    }
                } else if (cell.state == fmmdist::far) {
                    treedist temp;
                    temp.coord = {r, c};
                    temp.dist  = dist;
                    temp.known = 1;
                    temp.ts    = tsclock;

                    cell.state  = fmmdist::close;
                    cell.dist   = dist;
                    cell.handle = queue.push(temp);
                }
            }
            tsclock += 1;
        }
    }
};

#include <chrono>
#include <iomanip>
#include <iostream>
namespace {
    static long elapsed() {
        auto now = std::chrono::high_resolution_clock::now;
        using namespace std::chrono_literals;
        static auto start = now();
        auto        t     = now();

        return (t - std::exchange(start, t)) / 1ms;
    }

    void trace(auto const&... args) {
        ((std::cout << std::setw(10) << elapsed() << "ms ") << ... << args) << std::endl;
    }

    template <typename Queue> void run() {
        trace(__PRETTY_FUNCTION__);

        auto a = std::make_unique<Algorithm<Queue, 5601>>();
        trace("Constructed");
        a->initPhi();
        a->initState();
        trace("Finished state init ");
        a->initialization(true);
        trace("Finished init of FMM ");
        a->loopFMVel();
        trace("Done");
    }
} // namespace

int main(int argc, char**) {
    std::ios::sync_with_stdio(false);
    trace("Start");
    if (argc == 1) run<DistanceTree>();
    if (argc >= 2) run<DistanceHeap>();
    if (argc == 3) run<DistanceTree>();
    trace("Exit");
}

印刷(在线)

g++ -std=c++20 -O3 -DNDEBUG -ffast-math -Wall -pedantic -pthread main.cpp && ./a.out heap tree

     0ms Start
     0ms void {anonymous}::run() [with Queue = DistanceHeap]
  1529ms Constructed
   263ms Finished state init 
   228ms Finished init of FMM 
 10523ms Done
   267ms void {anonymous}::run() [with Queue = DistanceTree]
  1204ms Constructed
   282ms Finished state init 
   225ms Finished init of FMM 
  8305ms Done
   262ms Exit

本地演示:

© www.soinside.com 2019 - 2024. All rights reserved.