ARM上的无锁SPSC队列实现

问题描述 投票:0回答:1

[我正在尝试为ARM写一个生产者单一消费者队列,我想我已经将DMB包裹了一下,但是需要一些检查(我对std :: atomic更加熟悉。]

我在这里:

bool push(const_reference value)
{
    // Check for room
    const size_type currentTail = tail;
    const size_type nextTail = increment(currentTail);
    if (nextTail == head)
        return false;

    // Write the value
    valueArr[currentTail] = value;

    // Prevent the consumer from seeing the incremented tail before the
    // value is written.
    __DMB();

    // Increment tail
    tail = nextTail;

    return true;
}

bool pop(reference valueLocation)
{
    // Check for data
    const size_type currentHead = head;
    if (currentHead == tail)
        return false;

    // Write the value.
    valueLocation = valueArr[currentHead];

    // Prevent the producer from seeing the incremented head before the
    // value is written.
    __DMB();

    // Increment the head
    head = increment(head);

    return true;
}

我的问题是:我的DMB放置和理由是否正确?还是仍然了解我失踪了?对于处理由另一个线程(或中断)更新的变量时,条件是否需要某种保护,我尤其不确定。

c++ arm synchronization lock-free memory-barriers
1个回答
0
投票
  • 一个障碍是必要的,但还不够,您还需要“获取”语义来加载由另一个线程修改的var。 (或者至少是consume,但要获得障碍就必须使用asm来创建数据依赖项。在已经具有控制依赖项的情况下,编译器不会这样做。)
  • 单核系统只能使用像GNU C asm("":::"memory")std::atomic_signal_fence(std::memory_order_release)这样的编译器屏障,而不能使用dmb。创建一个宏,以便您可以在SMP安全屏障或UP(单处理器)屏障之间进行选择。
  • [head = increment(head);是对head的无意义重载,请使用本地副本。
  • 使用std::atomic可移植地获取必要的代码。

您通常不需要滚动自己的原子;用于ARM的现代编译器确实实现了std::atomic<T>。但是,对于AFAIK,没有std::atomic<>实施会意识到单核系统来避免实际的障碍,只是为了安全起见。可能导致上下文切换的中断。

在单核系统上,您不需要dsb,只需编译器障碍。 CPU将保留按程序顺序顺序执行的asm指令的错觉。您只需要确保编译器生成了按正确顺序执行操作的asm。您可以通过将std::atomicstd::memory_order_relaxed以及手动atomic_signal_fence(memory_order_acquire)release屏障一起使用来实现。 (不是atomic_thread_fence;这会发出asm指令,通常是dsb。)


每个线程读取另一个线程修改的变量。通过确保只有在访问数组后才能看到修改发布存储库,您才能正确制作它们。

但是这些读取也必须是acquire-loads才能与这些发行存储同步。例如。以确保在push完成读取同一元素之前valueArr[currentTail] = value;没有写入pop。或在未完全撰写之前阅读条目。

[没有任何障碍,失败模式将是if (currentHead == tail) return false;直到之后才实际从内存检查tail的值valueLocation = valueArr[currentHead];发生。运行时负载重新排序可以轻松地在弱排序的ARM上做到这一点。如果加载地址具有对tail的数据依赖关系,则可以避免在SMP系统上存在障碍(ARM保证在asm中对依赖关系进行排序; mo_consume应该公开的功能)。但是,如果编译器仅发出一个分支,则仅是控件依赖性,而不是数据。如果您是用asm手工编写的,则通过比较设置的标志上的ldrne r0, [r1, r2]之类的谓词负载,我认为会创建data依赖项。

编译时重新排序似乎不太合理,但是仅编译器的障碍是免费的,如果它只是阻止编译器执行它本来不会做的事情。


未经测试的实现,编译为看起来不错的asm,但没有其他测试

push做类似的事情。我包括用于获取负载/存储释放的包装函数,以及fullbarrier()。 (等效于Linux内核的smp_mb()宏,定义为编译时或编译+运行时屏障。)

#include <atomic>

#define UNIPROCESSOR


#ifdef UNIPROCESSOR
#define fullbarrier()  asm("":::"memory")   // GNU C compiler barrier
                          // atomic_signal_fence(std::memory_order_seq_cst)
#else
#define fullbarrier() __DMB()    // or atomic_thread_fence(std::memory_order_seq_cst)
#endif

template <class T>
T load_acquire(std::atomic<T> &x) {
#ifdef UNIPROCESSOR
    T tmp = x.load(std::memory_order_relaxed);
    std::atomic_signal_fence(std::memory_order_acquire);
    // or fullbarrier();  if you want to use that macro
    return tmp;
#else
    return x.load(std::memory_order_acquire);
    // fullbarrier() / __DMB();
#endif
}

template <class T>
void store_release(std::atomic<T> &x, T val) {
#ifdef UNIPROCESSOR
    std::atomic_signal_fence(std::memory_order_release);
    // or fullbarrier();
    x.store(val, std::memory_order_relaxed);
#else
    // fullbarrier() / __DMB(); before plain store
    return x.store(val, std::memory_order_release);
#endif
}

template <class T>
struct SPSC_queue {
  using size_type = unsigned;
  using value_type = T;
  static const size_type size = 1024;

  std::atomic<size_type> head;
  value_type valueArr[size];
  std::atomic<size_type> tail;  // in a separate cache-line from head to reduce contention

  bool push(const value_type &value)
  {
    // Check for room
    const size_type currentTail = tail.load(std::memory_order_relaxed);  // no other writers to tail, no ordering needed
    const size_type nextTail = currentTail + 1;    // modulo separately so empty and full are distinguishable.
    if (nextTail == load_acquire(head))
        return false;

    valueArr[currentTail % size] = value;
    store_release(tail, nextTail);
    return true;
  }
};

// instantiate the template for  int  so we can look at the asm
template bool SPSC_queue<int>::push(const value_type &value);

[如果您使用on the Godbolt compiler explorer,请使用-DUNIPROCESSOR干净地编译带有零障碍的g++9.2 -O3 -mcpu=cortex-a15(只是选择一个随机的现代ARM内核,以便GCC可以为非单处理器内联std::atomic加载/存储功能和障碍情况。

© www.soinside.com 2019 - 2024. All rights reserved.