原子值的部分比较和完全交换

问题描述 投票:2回答:3

问题如下。

给出一个具有两部分的POD对象:索引数据。我想通过仅检查索引是否相等的条件对其执行原子条件交换操作。

类似这样的东西:

struct Data { size_t m_index; char m_data; };
std::atomic<Data> dd; // some initialization
Data zz; // some initialization

// so I want something like this
dd.exchange_if_equals<&Data::m_index>(10,zz);

所以这是一种“ partial-compare-and-full-swap”操作。也许这需要对Data::m_index进行适当的对齐。

显然std::atomic不支持此功能,但是一个人可以自己实现此功能,或者可能有另一个库支持此功能?

c++ memory-alignment stdatomic compare-and-swap
3个回答
2
投票

我认为您必须先进行加载,然后自定义条件,然后进行比较并交换,其中比较是当前值完全等于读取值。如果最后一步失败,则循环。

template<class T, class F>
bool swap_if(std::atomic<T>& atomic, T desired, F&& condition) {
    for (;;) {
        T data = atomic.load();
        if (!condition(data)) break;
        if (atomic.compare_exchange_weak(data, desired)) return true;
    }
    return false;
}

http://coliru.stacked-crooked.com/a/a394e336628246a9

由于复杂性,您可能应该只使用互斥量。另外,由于std::atomic<Data>太大,因此Data可能已经在盖子下使用了互斥锁。


0
投票

就像C ++一样,硬件CAS(例如x86-64或ARMv8.1)在asm中不支持此功能,您必须自己动手。

在C ++中,这非常简单:加载原始值并替换其中的一部分。如果另一个核心更改了您不想与之相对的另一部分,那么这当然会导致虚假故障。

如果可能,请使用unsigned m_index而不是size_t,因此整个结构在典型的64位计算机上可以容纳8个字节,而不是16。16个字节的原子速度更慢(尤其是纯负载)部分)在x86-64上,或者在某些实现和/或某些ISA上甚至根本没有锁。请参见How can I implement ABA counter with c++11 CAS? re:具有当前GCC /叮当声的x86-64 lock cmpgxchg16b

如果每个atomic<>访问分别获得一个锁,那么最好在整个自定义比较和设置周围使用一个互斥体会好得多。


#include <atomic>
struct Data {
    // without alignment, clang's atomic<Data> doesn't inline load + CAS?!?  even though return d.is_always_lock_free; is true
    alignas(long long)  char m_data;
    unsigned m_index;               // this last so compilers can replace it slightly more efficiently
};

inline bool partial_cas_weak(std::atomic<Data> &d, unsigned expected_idx, Data zz, std::memory_order order = std::memory_order_seq_cst)
{
    Data expected = d.load(std::memory_order_relaxed);
    expected.m_index = expected_idx;            // new index, same everything else
    return d.compare_exchange_weak(expected, zz, order);
    // updated value of "expected" discarded on CAS failure
    // If you make this a retry loop, use it instead of repeated d.load
}

这在x86-64(Godbolt的clang中实际上很好地编译,内联到传递编译时常数order的调用程序中(否则clang在该order arg上发疯了分支)用于该功能的独立非内联版本)

# clang10.0 -O3 for x86-64
test_pcw(std::atomic<Data>&, unsigned int, Data):
    mov     rax, qword ptr [rdi]                  # load the whole thing
    shl     rsi, 32
    mov     eax, eax                              # zero-extend the low 32 bits, clearing m_index
    or      rax, rsi                              # OR in a new high half = expected_idx
    lock            cmpxchg qword ptr [rdi], rdx      # the actual 8-byte CAS
    sete    al                                        # boolean FLAG result into register
    ret

不幸的是,编译器太笨了,无法仅加载它们实际需要的原子结构的一部分,而是加载整个东西,然后将它们不需要的部分归零。 (有关工会黑客的信息,请参见How can I implement ABA counter with c++11 CAS?,以解决某些编译器上的问题。)

[不幸的是,GCC使凌乱的asm临时存储/重新加载到堆栈,从而导致存储转发停顿。 GCC也将char m_data之后的填充置零(无论是第一个还是最后一个成员),如果内存中的实际对象具有非零填充,则可能导致CAS始终失败。如果纯存储和初始化始终将其设置为零,那可能是不可能的。


[ARM]或PowerPC之类的LL/SC machine可以很容易地在汇编中完成此操作(比较/分支是在加载链接和存储条件之间手动进行),但是没有可移植的库。 (最重要的是,因为它无法针对x86之类的机器进行编译,并且因为在LL / SC事务中可以执行的操作受到严格限制,并且调试模式溢出/重新加载本地var可能导致代码始终失败。)


0
投票

如果选择使用std::mutex而不是atomic,则可以将互斥锁放在自己的原子状包装中。

这里是它的开始:

#include <iostream>
#include <type_traits>
#include <mutex>

template<typename T>
class myatomic {
public:
    static_assert(
        // std::is_trivially_copyable_v<T> && // used in std::atomic, not needed here
        std::is_copy_constructible_v<T> &&
        std::is_move_constructible_v<T> &&
        std::is_copy_assignable_v<T> &&
        std::is_move_assignable_v<T>, "unsupported type");

    using value_type = T;

    myatomic() : data{} {}
    explicit myatomic(const T& v) : data{v} {}

    myatomic(const myatomic& rhs) : myatomic(rhs.load()) {}

    myatomic& operator=(const myatomic& rhs) {
        std::scoped_lock lock(mtx, rhs.mtx);
        data = rhs.data;
        return *this;
    }

    T load() const {
        const std::lock_guard<std::mutex> lock(mtx);
        return data;
    }

    operator T() const {
        return load();
    }

    void store(const T& v) {
        const std::lock_guard<std::mutex> lock(mtx);
        data = v;
    }

    myatomic& operator=(const T& v) {
        store(v);
        return *this;
    }

    template<typename Mptr, typename V>
    bool exchange_if_equals(Mptr mvar, V mval, V oval) {
        const std::lock_guard<std::mutex> lock(mtx);
        if(data.*mvar == mval) {
            data.*mvar = oval;
            return true;
        }
        return false;
    }

    template<typename Mptr>
    auto get(Mptr mvar) const {
        const std::lock_guard<std::mutex> lock(mtx);
        return data.*mvar;
    }

    template<typename Mptr, typename V>
    void set(Mptr mvar, const V& v) {
        const std::lock_guard<std::mutex> lock(mtx);
        data.*mvar = v;
    }

private:
    mutable std::mutex mtx;
    T data;
};

struct Data {
    size_t m_index;
    char m_data;
};

int main() {
    Data orig{10, 'a'};

    myatomic<Data> dd(orig);
    dd.exchange_if_equals(&Data::m_index, 10U, 20U);
    std::cout << dd.get(&Data::m_index);
}
© www.soinside.com 2019 - 2024. All rights reserved.