我在Windows 7 64位VS2013(x64发行版)上尝试内存排序。我想使用最快的同步共享对容器的访问。我选择了原子比较和交换。
我的程序产生两个线程。作家将一个向量推入,而读者则可以检测到。
最初我没有指定任何内存顺序,所以我假设它使用memory_order_seq_cst
?
使用memory_order_seq_cst
,每操作延迟为340-380个周期
为了尝试提高性能,我使商店使用memory_order_release
,而装入时使用memory_order_acquire
。
但是,每操作延迟增加到大约1,940个周期。
我误解了吗?完整代码如下。
使用默认memory_order_seq_cst
:
#include <iostream>
#include <atomic>
#include <thread>
#include <vector>
std::atomic<bool> _lock{ false };
std::vector<uint64_t> _vec;
std::atomic<uint64_t> _total{ 0 };
std::atomic<uint64_t> _counter{ 0 };
static const uint64_t LIMIT = 1000000;
void writer()
{
while (_counter < LIMIT)
{
bool expected{ false };
bool val = true;
if (_lock.compare_exchange_weak(expected, val))
{
_vec.push_back(__rdtsc());
_lock = false;
}
}
}
void reader()
{
while (_counter < LIMIT)
{
bool expected{ false };
bool val = true;
if (_lock.compare_exchange_weak(expected, val))
{
if (_vec.empty() == false)
{
const uint64_t latency = __rdtsc() - _vec[0];
_total += (latency);
++_counter;
_vec.clear();
}
_lock = false;
}
}
}
int main()
{
std::thread t1(writer);
std::thread t2(reader);
t2.detach();
t1.join();
std::cout << _total / _counter << " cycles per op" << std::endl;
}
使用memory_order_acquire
和memory_order_release
:
void writer()
{
while (_counter < LIMIT)
{
bool expected{ false };
bool val = true;
if (_lock.compare_exchange_weak(expected, val, std::memory_order_acquire))
{
_vec.push_back(__rdtsc());
_lock.store(false, std::memory_order_release);
}
}
}
void reader()
{
while (_counter < LIMIT)
{
bool expected{ false };
bool val = true;
if (_lock.compare_exchange_weak(expected, val, std::memory_order_acquire))
{
if (_vec.empty() == false)
{
const uint64_t latency = __rdtsc() - _vec[0];
_total += (latency);
++_counter;
_vec.clear();
}
_lock.store(false, std::memory_order_release);
}
}
}
您没有任何保护措施防止线程在释放它后立即再次获得该锁,只是发现_vec.empty()
为not为假,或存储另一个TSC值,从而覆盖了该锁从未被看到的那个读者。 我怀疑您的更改使读者浪费了更多时间来阻止作者(反之亦然),从而导致实际吞吐量降低。
_lock.store(false, seq_cst);
编译为xchg
,而不是普通的mov
存储。它必须等待存储缓冲区耗尽,并且只是普通的慢速[[1(例如,在Skylake上,微编码为8微码,对于许多重复的背对背操作而言,每23个周期1个吞吐量), -争用的情况,它在L1d缓存中已经很热。您没有指定有关所拥有硬件的任何信息。
_lock.store(false, std::memory_order_release);
只是编译为普通的mov
存储,没有任何额外的屏障指令。因此,_counter
的重新加载可以与其并行发生(尽管分支预测+投机执行使该问题不再存在)。更重要的是,下一次CAS尝试进行锁定实际上可以更早尝试。[当有多个内核锤击时,可能会有硬件仲裁来访问高速缓存行,大概带有一些公平的启发法,但是我不知道细节是否已知。
您可以通过不同地使用锁来完全解决此问题:
写作者等待false
,然后在完成后存储true
。读者则相反。因此,在没有其他线程转弯的情况下,作者永远无法重新输入关键部分。 (当您“等待一个值”时,对它进行只读加载而不是对CAS进行加载。x86上的CAS需要高速缓存行的独占所有权,从而防止了其他线程的读取。只有一个读取器和一个写入器,您才能不需要任何原子RMW即可工作。)[如果您有多个读取器和多个写入器,则可以有一个4状态同步变量,其中写入器尝试将CAS的值从0变为1,然后在完成时存储2。读者尝试从2到3进行CAS,然后在完成后存储0。
xchg
的速度不及mov
+ mfence
。这是在x86上实现seq_cst纯存储的最佳方法。但是它比普通的mov
慢。machine_clears.memory_ordering
(Why flush the pipeline for Memory Order Violation caused by other logical processors?)的性能计数器。如果您只是想测量线程间的延迟,您还会遇到很多非常复杂的事情。
[您都有两个线程读取编写器每次更新的_total
,而不是在完成所有操作时仅存储标志。
您在阅读器中的RMW原子增量也为_counter
,即使该变量是阅读器专用的。它可以是在reader.join()
之后读取的普通非原子全局变量,甚至更好的是可以是仅在循环后存储到全局变量的局部变量。 (由于发行版存储的原因,一个普通的非原子全局变量可能仍会在每次迭代时最终存储到内存中,而不是保存在寄存器中。由于这是一个很小的程序,因此所有全局变量可能彼此相邻,并且可能在同一缓存行中。)
std::vector
也是不必要的
__rdtsc()
环绕64位计数器1,否则不会为零,因此您可以将0
用作标量uint64_t
中的标记值,以表示为空。或者,如果您修复了锁定问题,以使读者在没有作者转弯的情况下无法重新进入关键部分,则可以删除该支票。脚注2:对于〜4GHz TSC参考频率,即2 ^ 64/10 ^ 9秒,足够接近2 ^ 32秒〜= 136年,可以环绕TSC。请注意,TSC参考频率为当前内核时钟频率的not值;对于给定的CPU,它固定为某个值。通常接近额定“贴纸”频率,而不是最大涡轮频率。
_
的名称。不要将它们用于您自己的变量。 (通常在任何地方都没有。如果您确实需要,可以使用下划线代替。)