我有一个用邻接矩阵arr
表示的图。以及多个起始顶点的向量source
。
[我的想法是根据线程数将source
向量分为“相等”的部分(如果不均等地分割,则将剩余部分加到最后一部分)。并创建运行此功能的线程。 bool used[]
是全局array
我正在尝试(我认为这是)“线性”缩放。我假设起始顶点的数量至少等于线程的数量。
如果我使用互斥锁来同步线程,则效率很低。如果不这样做,那么将遍历一些顶点,然后再遍历一次。问题是否有让我删除互斥锁的数据结构?或另一种实现此算法的方法?
mutex m;
void msBFS(bool** arr, int n, vector<int> s, atomic<bool>* used) //s is a different
// piece of the original source
{
queue<int> que;
for(auto i = 0; i < s.size(); ++i)
{
que.push(s[i]);
used[s[i]] = true;
}
while (!que.empty())
{
int curr = que.front();
que.pop();
cout << curr << " ";
for (auto i = 0; i < n; ++i)
{
lock_guard<mutex> guard(m);
if (arr[curr][i] == 1 && !used[i] && curr != i)
{
que.push(i);
used[i] = true;
}
}
}
}```
atomic<bool>
,我想你快到了。唯一缺少的是原子交换操作。它允许您以原子操作方式进行读取-修改-写入。对于bool
原子类型,通常有一块支持它的硬件。
void msBFS(bool** arr, int n, vector<int> s, atomic<bool>* used) //s is a different
// piece of the original source
{
//used[i] initialized to 'false' for all i
queue<int> que;
for(auto i = 0; i < s.size(); ++i)
{
que.push(s[i]);
//we don't change used just yet!
}
while (!que.empty())
{
int curr = que.front();
que.pop();
bool alreadyUsed = used[curr].exchange(true);
if(alreadyUsed) {
continue; //some other thread already processing it
}
cout << curr << " ";
for (auto i = 0; i < n; ++i) {
if (arr[curr][i] == 1 && !used[i] && curr != i) {
que.push(i);
}
}
}
}
注意,这是一个逻辑更改:当线程开始处理节点时,而不是将其添加到队列时,used[i]
设置为true
。在首次尝试处理节点时,当used[i]
设置为true时,alreadyUsed
将保留先前的值(false
),该值指示没有其他人更早地开始处理该节点。在随后尝试处理该节点时,alreadyUsed
将已经设置为true
,并且处理将跳过。
上面的方法不是理想的:在处理节点之前,可以将一个节点多次添加到队列中。根据图形的形状,可能有问题也可能没有问题。
如果这是一个问题-我建议使用树值逻辑:not_visited
,queued
和processed
。
static constexpr int not_visited = 0;
static constexpr int queued = 1;
static constexpr int processed = 2;
现在,每次我们尝试推入que
时,并且每次尝试处理节点时,我们都会相应地推进状态。这些进步需要通过compare_exchange_strong
原子地执行,以便每个更改可能恰好发生一次。如果成功调用compare_exchange_strong
,则返回true
(即先前包含的值实际上与expected
匹配)]
void msBFS(bool** arr, int n, vector<int> s, atomic<int>* used) //s is a different
// piece of the original source
{
//used[i] initialized to '0' for all i
queue<int> que;
int empty = 0;
for(auto i = 0; i < s.size(); ++i) {
int expected = not_visited;
//we check it even here, because one thread may be seriously lagging behind others which already entered the while loop
if(used[s[i]].compare_exchange_strong(expected, queued)) {
que.push(s[i]);
}
}
while (!que.empty())
{
int curr = que.front();
que.pop();
int expected = queued;
if(!used[s[i]].compare_exchange_strong(expected, processed)) {
continue;
}
cout << curr << " ";
for (auto i = 0; i < n; ++i) {
if (arr[curr][i] == 1 && curr != i) {
int expected = not_visited;
if(used[i].compare_exchange_strong(expected, queued)) {
que.push(i);
}
}
}
}
}
检查性能。原子操作有很多,但通常比互斥体便宜。在内部,互斥锁还执行类似于这些操作的原子操作,但除此之外,它可能会完全阻塞线程。我显示的代码从不阻塞(线程永远不会停止),所有同步仅在原子变量上完成。