如果 std::atomic<T>::compare_exchange_weak 的期望值是非原子操作的返回值,它仍然是原子的吗?

问题描述 投票:0回答:2

head.load()->next是否会破坏while(head && !head.compare_exchange_weak(ptr, head.load()->next)中使用的std::atomic::compare_exchange_weak的原子性);?我相信应该如此,但在输出中似乎并非如此(在代码示例之后)。或者甚至,这个问题有意义吗?

另一个问题: new_node->next = head.exchange(new_node); 是一个原子操作吗?我的意思是,当然 std::atomic::exchange 是原子的,但是返回值的赋值又如何呢? (假设变量new_node被其他线程访问,任何线程都可以在返回值的赋值之间介入吗?)

这是代码示例(使用 -std=c++17 标志编译):

#include <iostream>
#include <thread>
#include <atomic>
#include <iomanip>
#include <array>
#include <future>
#include <mutex>
#include <exception>

static constexpr unsigned num_of_thread{ 100U };

struct Node{
    unsigned data;
    Node* next;

    Node() : data{0}, next{nullptr} {}
    Node(unsigned val) : data{val}, next{nullptr} {}
};

void test1() {
    std::atomic<Node*> head{ nullptr };
    std::cout << "Is lock-free?: " << std::boolalpha << head.is_lock_free() << std::endl;

    auto add_new_node = [&head](std::mutex& mut,
                                std::condition_variable& cond_var,
                                bool& guarantor,
                                unsigned i)
    {
        // Every thread creates its own unique node, so, no race condition here
        Node* new_node{ new Node{i} };
        {
            std::unique_lock<std::mutex> lck{ mut };
            cond_var.wait(lck, [&guarantor](){ return guarantor; });
            // All threads pile up here and wait for notification
            // (basically what std::barrier does)
        }

        // Ideally, all threads hit the below line at the same time.
        // Would there be a race condition here if new_node was a shared variable?
        // (Between the "exchange" atomic operation and the assignment of return value)
        new_node->next = head.exchange(new_node);
    };

    auto pop_a_node = [&head](std::mutex& mut,
                              std::condition_variable& cond_var,
                              bool& guarantor)
        {
        Node* ptr{ head };

        {
            std::unique_lock<std::mutex> lck{ mut };
            cond_var.wait(lck, [&guarantor](){ return guarantor; });
            // All threads pile up here and wait for notification
            // (basically what std::barrier does)
        }

        // Do we break atomicity here by using head.load()->next?
        // Or even, does this question make sense? 
        while(head && !head.compare_exchange_weak(ptr, head.load()->next));

        // Using ptr from this point onwards is safe as it is local to every thread
        if(ptr) {
            unsigned retval{ ptr->data };
            delete ptr;
            
            // Protect concurrent access to std::cout (nothing else)
            // in order not to get a scrambled output
            mut.lock();
            std::cout << retval << ' ';
            mut.unlock();
        }
        else {
            mut.lock();
            std::cout << "* ";
            mut.unlock();
        }
    };

    // Half of the threads add, the other half pops nodes
    [&pop_a_node, &add_new_node]() {
        std::condition_variable cond_var{};
        bool guarantor{ false }; // Needed in order to prevent spurious wakeups
        std::mutex mut;

        std::array<std::thread, num_of_thread> threads{};
        for(unsigned i{}; i<num_of_thread; i++) {
            if(i%2U) {
                threads[i] = std::move(std::thread{add_new_node,
                                                   std::ref(mut),
                                                   std::ref(cond_var),
                                                   std::ref(guarantor),
                                                   i});
            }
            else {
                threads[i] = std::move(std::thread{pop_a_node,
                                                   std::ref(mut),
                                                   std::ref(cond_var),
                                                   std::ref(guarantor)});
            }
        }

        // Wake all threads up at the same time. This should allow
        // more race condition possibilities compared to
        // just starting new threads repeatedly 
        guarantor = true;
        cond_var.notify_all();

        for(unsigned i{}; i<num_of_thread; i++) {
            threads[i].join();
        }
    }();
}

int main() {
    test1();

    return 0;
}

这是一个示例输出: 是否无锁?: 是的

  • 49 9 71 69 95 * * * 97 3 87 * 27 81 85 51 11 29 53 5 * 31 23 19 99 45 * 55 83 13 * * 75 25 33 77 73 61 93 91 35 57 67 79 63 65 47 41 17

(* 是当线程由于遇到空列表而无法弹出元素时。我检查了丢失的元素仍在列表中。)

c++ concurrency atomic race-condition lock-free
2个回答
1
投票

在 while 循环中

while(head && !head.compare_exchange_weak(ptr, head.load()->next));

你有 3 个不同的原子操作,它们按顺序排列,但整个事情不是原子的,所以其他线程可能会进来并在它们之间做事情

  • head(加载头并检查是否为空)
  • head.load()(加载 head 然后解引用以获得
    next
  • head.compare_exchange

明显的问题是,如果两个线程同时执行“pop”操作,则可能会让第二个线程进入并在第一个线程的步骤 1 和步骤 2 之间执行其操作。如果当时堆栈上只有一个东西,那么第一个线程可以通过 null 检查,然后在第二步中获得 nullptr(并且可能会崩溃取消引用它)。

通常人们会把这个循环写成

while((ptr = head.load()) && !head.compare_exchange_weak(ptr, ptr->next));

也就是说,您只在循环中加载一次 head 值,然后使用该本地值。如果在一个线程尝试弹出时可以弹出、释放、重新分配和重新推送节点,那么这仍然存在经典的 ABA 竞争问题。这可能是问题,也可能不是问题——只要弹出线程在完全弹出节点之前不尝试对节点执行任何操作,它是否切换到其他节点并不重要。它仍然是一个已推送且未被其他人弹出的节点。


您的推送原语也有类似的问题

new_node->next = head.exchange(new_node);

存储到

new_node->next
中对于交换来说不是原子的,因此另一个线程可能在
head.exchange
之后但在存储之前访问它。所以代码应该是:

do { new_node->next = head.load(); } while (!head.compare_exchange_weak(new_node->next, new_node));

这确保存储发生在发布新推送节点的原子操作之前


1
投票

head->compareExchangeWeak(x,y)

 是一个函数调用。与任何函数调用一样,参数表达式(在本例中为 
x
y
)在调用函数之前
进行计算。 你写了

head.compare_exchange_weak(ptr, head.load()->next))

表达式、

ptr

head.load()->next
都在调用compareExchangeWeak 之前计算。传入compare_exchange_weak函数的实际
argments
只是两个指针——预期的和新的。到那时,对head.load()的呼唤已经成为古老的历史。 这能回答你的问题吗?
    

© www.soinside.com 2019 - 2024. All rights reserved.