预分配的节点向量中的无锁树节点分配

问题描述 投票:1回答:1

我正在尝试多线程创建一个树,其中持有树的类在特定大小的块中预先分配std::vectorNode(概念上大小是任意的)。只有在必要时才会创建额外的Node块,这是因为树很快变得非常大,我想避免不断使用new算子来提高时间效率。

这些Nodes的载体定义为:std::vector< std::vector< Node > > nodes

head跟踪内部向量中的位置,chunkCount跟踪当前正在使用的外部向量。

向量在构造函数中调整为:

nodes.resize( 1 );
nodes[chunkCount].resize( CHUNK_SIZE );

Node的简化版本将是:

typedef struct Node {
    int val;
    Node* subnodes[5];
} Node;

创建新的Node如下:

void TreeClass::createNode( Node* node, short index, int val )
{
    omp_set_lock( &treeLock ); // treeLock belongs to TreeClass
    head++;
    if( head == CHUNK_SIZE ) {
        std::vector< Node > tempNodeVec( CHUNK_SIZE );
        nodes.push_back( tempNodeVec );
        chunkCount++;
        head = 0;
    }
    node->subnodes[index] = &( nodes[chunkCount][head] );
    omp_unset_lock( &treeLock );

    node->subnodes[index]->val = val;
}

这完全没问题。然而,我担心的是,除了一个线程之外的所有线程都在创建节点时被阻止,这种情况经常发生,因此花费了大量时间来阻止或锁定/解锁treeLock,因此我希望使这个功能无锁,但我的尝试到目前为止都失败了。

改变headchunkCount很容易,没有使用#pragma omp atomic锁定(或使用std::atomic< int >s),但这是确保if( ... )语句只执行一次并且在任何线程继续分配子地址之前的逻辑,即保证他们使用正确/更新的chunkCounthead

阅读关于无锁算法的一个想法是在std::atomic< Node* > subnodes[5]中使用Node并执行CAS操作等待正确更新的headchunkCnt但不知道什么是“正确的”,我怎么知道我在等什么呢?

另一个(天真的)想法是:

int myHead;
if( ++head == CHUNK_SIZE ) {
    std::vector< Node > tempNodeVec( CHUNK_SIZE );
    nodes.push_back( tempNodeVec );
    chunkCount++;
    myhead = head = 0;
} else {
    myhead = head;
    while( head > CHUNK_SIZE )
        myHead = ++head;
}
node->subnodes[index] = &( nodes[chunkCount][myHead] );

想法是只有一个线程进入if( ... ),直到它将head设置为0,其余的将被卡在else { ... }但我已经可以看到这种方法的许多问题。

任何帮助都将不胜感激。

c++ multithreading tree openmp lock-free
1个回答
1
投票

我建议你使用线程专用内存池。为此,您可以使用如下注释:

#pragma omp threadprivate(nodes)

这不仅比尝试保护对共享内存池的访问要简单得多,而且由于数据局部性,性能也可能更好。

注意:基于原子与你的解决方案无锁是不可能的,因为nodes[chunkCount] - 每次分配都需要 - 必须始终受到nodes.push_back的保护。

功能齐全的内存池更复杂,但只需一步,您可以尝试使用std::deque。它提供了你所需要的东西而不会弄乱两个向量 - 在恒定时间内插入元素而不会使现有元素的指针无效。你控制力较弱,但这是一个好的开始。

© www.soinside.com 2019 - 2024. All rights reserved.