head.load()->next是否会破坏while(head && !head.compare_exchange_weak(ptr, head.load()->next)中使用的std::atomic::compare_exchange_weak的原子性);?我相信应该如此,但在输出中似乎并非如此(在代码示例之后)。或者甚至,这个问题有意义吗?
另一个问题: new_node->next = head.exchange(new_node); 是一个原子操作吗?我的意思是,当然 std::atomic::exchange 是原子的,但是返回值的赋值又如何呢? (假设变量new_node被其他线程访问,任何线程都可以在返回值的赋值之间介入吗?)
这是代码示例(使用 -std=c++17 标志编译):
#include <iostream>
#include <thread>
#include <atomic>
#include <iomanip>
#include <array>
#include <future>
#include <mutex>
#include <exception>
static constexpr unsigned num_of_thread{ 100U };
struct Node{
unsigned data;
Node* next;
Node() : data{0}, next{nullptr} {}
Node(unsigned val) : data{val}, next{nullptr} {}
};
void test1() {
std::atomic<Node*> head{ nullptr };
std::cout << "Is lock-free?: " << std::boolalpha << head.is_lock_free() << std::endl;
auto add_new_node = [&head](std::mutex& mut,
std::condition_variable& cond_var,
bool& guarantor,
unsigned i)
{
// Every thread creates its own unique node, so, no race condition here
Node* new_node{ new Node{i} };
{
std::unique_lock<std::mutex> lck{ mut };
cond_var.wait(lck, [&guarantor](){ return guarantor; });
// All threads pile up here and wait for notification
// (basically what std::barrier does)
}
// Ideally, all threads hit the below line at the same time.
// Would there be a race condition here if new_node was a shared variable?
// (Between the "exchange" atomic operation and the assignment of return value)
new_node->next = head.exchange(new_node);
};
auto pop_a_node = [&head](std::mutex& mut,
std::condition_variable& cond_var,
bool& guarantor)
{
Node* ptr{ head };
{
std::unique_lock<std::mutex> lck{ mut };
cond_var.wait(lck, [&guarantor](){ return guarantor; });
// All threads pile up here and wait for notification
// (basically what std::barrier does)
}
// Do we break atomicity here by using head.load()->next?
// Or even, does this question make sense?
while(head && !head.compare_exchange_weak(ptr, head.load()->next));
// Using ptr from this point onwards is safe as it is local to every thread
if(ptr) {
unsigned retval{ ptr->data };
delete ptr;
// Protect concurrent access to std::cout (nothing else)
// in order not to get a scrambled output
mut.lock();
std::cout << retval << ' ';
mut.unlock();
}
else {
mut.lock();
std::cout << "* ";
mut.unlock();
}
};
// Half of the threads add, the other half pops nodes
[&pop_a_node, &add_new_node]() {
std::condition_variable cond_var{};
bool guarantor{ false }; // Needed in order to prevent spurious wakeups
std::mutex mut;
std::array<std::thread, num_of_thread> threads{};
for(unsigned i{}; i<num_of_thread; i++) {
if(i%2U) {
threads[i] = std::move(std::thread{add_new_node,
std::ref(mut),
std::ref(cond_var),
std::ref(guarantor),
i});
}
else {
threads[i] = std::move(std::thread{pop_a_node,
std::ref(mut),
std::ref(cond_var),
std::ref(guarantor)});
}
}
// Wake all threads up at the same time. This should allow
// more race condition possibilities compared to
// just starting new threads repeatedly
guarantor = true;
cond_var.notify_all();
for(unsigned i{}; i<num_of_thread; i++) {
threads[i].join();
}
}();
}
int main() {
test1();
return 0;
}
这是一个示例输出: 是否无锁?: 是的
(* 是当线程由于遇到空列表而无法弹出元素时。我检查了丢失的元素仍在列表中。)
在 while 循环中
while(head && !head.compare_exchange_weak(ptr, head.load()->next));
你有 3 个不同的原子操作,它们按顺序排列,但整个事情不是原子的,所以其他线程可能会进来并在它们之间做事情
next
)明显的问题是,如果两个线程同时执行“pop”操作,则可能会让第二个线程进入并在第一个线程的步骤 1 和步骤 2 之间执行其操作。如果当时堆栈上只有一个东西,那么第一个线程可以通过 null 检查,然后在第二步中获得 nullptr(并且可能会崩溃取消引用它)。
通常人们会把这个循环写成
while((ptr = head.load()) && !head.compare_exchange_weak(ptr, ptr->next));
也就是说,您只在循环中加载一次 head 值,然后使用该本地值。如果在一个线程尝试弹出时可以弹出、释放、重新分配和重新推送节点,那么这仍然存在经典的 ABA 竞争问题。这可能是问题,也可能不是问题——只要弹出线程在完全弹出节点之前不尝试对节点执行任何操作,它是否切换到其他节点并不重要。它仍然是一个已推送且未被其他人弹出的节点。
您的推送原语也有类似的问题
new_node->next = head.exchange(new_node);
存储到
new_node->next
中对于交换来说不是原子的,因此另一个线程可能在head.exchange
之后但在存储之前访问它。所以代码应该是:
do { new_node->next = head.load(); } while (!head.compare_exchange_weak(new_node->next, new_node));
这确保存储发生在发布新推送节点的原子操作之前。
head->compareExchangeWeak(x,y)
是一个函数调用。与任何函数调用一样,参数表达式(在本例中为
x
和
y
)在调用函数之前进行计算。 你写了
head.compare_exchange_weak(ptr, head.load()->next))
。
表达式、 ptr
和
head.load()->next
都在调用compareExchangeWeak 之前计算。传入compare_exchange_weak函数的实际argments只是两个指针——预期的和新的。到那时,对
head.load()
的呼唤已经成为古老的历史。
这能回答你的问题吗?