多源BFS多线程

问题描述 投票:0回答:1

我有一个用邻接矩阵arr表示的图。以及多个起始顶点的向量source

[我的想法是根据线程数将source向量分为“相等”的部分(如果不均等地分割,则将剩余部分加到最后一部分)。并创建运行此功能的线程。 bool used[]是全局array

我正在尝试(我认为这是)“线性”缩放。我假设起始顶点的数量至少等于线程的数量。

如果我使用互斥锁来同步线程,则效率很低。如果不这样做,那么将遍历一些顶点,然后再遍历一次。问题是否有让我删除互斥锁的数据结构?或另一种实现此算法的方法?

mutex m;
void msBFS(bool** arr, int n, vector<int> s, atomic<bool>* used) //s is a different
                                                   // piece of the original source 
{
    queue<int> que;
    for(auto i  = 0; i < s.size(); ++i)
    {
        que.push(s[i]);
        used[s[i]] = true; 
    }
    while (!que.empty())
    {
        int curr = que.front();
        que.pop();
        cout << curr << " ";
        for (auto i = 0; i < n; ++i)
        {
            lock_guard<mutex> guard(m);
            if (arr[curr][i] == 1 && !used[i] && curr != i)
            {
                que.push(i);
                used[i] = true;
            }
        }
    }
}```

c++ multithreading breadth-first-search
1个回答
0
投票

atomic<bool>,我想你快到了。唯一缺少的是原子交换操作。它允许您以原子操作方式进行读取-修改-写入。对于bool原子类型,通常有一块支持它的硬件。

void msBFS(bool** arr, int n, vector<int> s, atomic<bool>* used) //s is a different
                                                   // piece of the original source 
{
    //used[i] initialized to 'false' for all i
    queue<int> que;
    for(auto i  = 0; i < s.size(); ++i)
    {
        que.push(s[i]);
        //we don't change used just yet!
    }
    while (!que.empty())
    {
        int curr = que.front();
        que.pop();
        bool alreadyUsed = used[curr].exchange(true);
        if(alreadyUsed) {
            continue; //some other thread already processing it
        }
        cout << curr << " ";
        for (auto i = 0; i < n; ++i) {
            if (arr[curr][i] == 1 && !used[i] && curr != i) {
                que.push(i);
            }
        }
    }
}

注意,这是一个逻辑更改:当线程开始处理节点时,而不是将其添加到队列时,used[i]设置为true。在首次尝试处理节点时,当used[i]设置为true时,alreadyUsed将保留先前的值(false),该值指示没有其他人更早地开始处理该节点。在随后尝试处理该节点时,alreadyUsed将已经设置为true,并且处理将跳过。

上面的方法不是理想的:在处理节点之前,可以将一个节点多次添加到队列中。根据图形的形状,可能有问题也可能没有问题。

如果这是一个问题-我建议使用树值逻辑:not_visitedqueuedprocessed

static constexpr int not_visited = 0;
static constexpr int queued = 1;
static constexpr int processed = 2;

现在,每次我们尝试推入que时,并且每次尝试处理节点时,我们都会相应地推进状态。这些进步需要通过compare_exchange_strong原子地执行,以便每个更改可能恰好发生一次。如果成功调用compare_exchange_strong,则返回true(即先前包含的值实际上与expected匹配)]

void msBFS(bool** arr, int n, vector<int> s, atomic<int>* used) //s is a different
                                                   // piece of the original source 
{

    //used[i] initialized to '0' for all i
    queue<int> que;
    int empty = 0;
    for(auto i = 0; i < s.size(); ++i) {
        int expected = not_visited;
        //we check it even here, because one thread may be seriously lagging behind others which already entered the while loop
        if(used[s[i]].compare_exchange_strong(expected, queued)) {
            que.push(s[i]);
        }
    }
    while (!que.empty())
    {
        int curr = que.front();
        que.pop();
        int expected = queued;
        if(!used[s[i]].compare_exchange_strong(expected, processed)) {
            continue;
        }
        cout << curr << " ";
        for (auto i = 0; i < n; ++i) {
            if (arr[curr][i] == 1 && curr != i) {
                int expected = not_visited;
                if(used[i].compare_exchange_strong(expected, queued)) {
                    que.push(i);
                }                    
            }
        }
    }
}

检查性能。原子操作有很多,但通常比互斥体便宜。在内部,互斥锁还执行类似于这些操作的原子操作,但除此之外,它可能会完全阻塞线程。我显示的代码从不阻塞(线程永远不会停止),所有同步仅在原子变量上完成。

© www.soinside.com 2019 - 2024. All rights reserved.