我遇到过这样的情况:N 个线程以小的增量步骤同时处理数据结构。然而,有时需要进行同步操作。
因此所有线程都需要暂停,等待其中一个线程执行此操作,然后继续。我正在寻找一种在不需要同步操作时对线程影响尽可能小的方法。
一个简单的选择是使用
shared_mutex
,但我认为开销较低的选项是可能的。我尝试使用下面的屏障和原子来尝试自己的解决方案。
所以我的问题是: 这是解决问题的有效方法吗? 有更好的解决办法吗?
#include <vector>
#include <thread>
#include <atomic>
#include <barrier>
int main()
{
const size_t nr_threads = 10;
std::vector<std::thread> threads;
std::barrier barrier { nr_threads };
std::atomic_bool sync_required { false };
auto rare_condition = []() { return std::rand() == 42; };
for (int i = 0; i < nr_threads; ++i)
{
threads.emplace_back([&, i]()
{
while (true)
{
if (sync_required)
{
if (i == 0)
{
barrier.arrive_and_wait();
sync_required = false;
// solo synchronized work
barrier.arrive_and_wait();
}
else
{
barrier.arrive_and_wait();
barrier.arrive_and_wait();
}
}
// standard loop body ...
// sometimes a global synchronized action is required
if (rare_condition()) sync_required = true;
}
});
}
// eventually ... treads quit
for (auto& thread : threads)
{
thread.join();
}
}
使用shared_mutex的另一种解决方案需要条件变量?
#include <array>
#include <atomic>
#include <thread>
#include <vector>
#include <barrier>
#include <shared_mutex>
int main()
{
const size_t nr_threads = 10;
std::vector<std::thread> threads;
std::shared_mutex sync_mtx;
std::atomic_bool sync_required { false };
auto rare_condition = []() { return std::rand() == 42; };
for (int i = 0; i < nr_threads; ++i)
{
threads.emplace_back([&, i]()
{
std::shared_lock shared_lock { sync_mtx };
while (true)
{
// very rarely another thread requires all the others to stop for a bit
if (sync_required)
{
if (i == 0)
{
// unlock shared, but lock unique, seems a little odd but neccesary
shared_lock.unlock();
{
std::unique_lock unique_lock{ shared_lock };
sync_required = false;
// solo sync work
}
shared_lock.lock();
}
else
{
shared_lock.unlock();
// todo: need condition variable, which adds more complexity to this solution?
shared_lock.lock();
}
}
// sometimes a global syncronized action is required
sync_required = sync_required || rare_condition();
}
});
}
// eventually ... treads quit
for (auto& thread : threads)
{
thread.join();
}
}
我对您的代码提出的主要优化是替换以下内容
// Thread: if (sync_required) {
if (i == 0) { // An arbitrary thread is choosen to run sync().
// It has to wait until other threads aknowledges having seen sync_required
barrier.arrive_and_wait();
sync();
通过这样的事情
// In main:
std::atomic<int> unpaused_count{ nr_threads }; // number of threads that haven't noticed sync_required
// Thread: if (sync_required) {
int old_unpaused_count = unpaused_count.fetch_sub(1); // This thread just noticed sync_required, update the count
if (old_unpaused_count == 1) { // The thread noticing last is choosen to run sync()
// It can run sync() immediately, since it knows that all other
// threads have seen sync_required, and have decided to wait.
sync();
// TODO: restore unpaused_count & resume other threads.
} else { // This thread isn't the last to notice: wait.
如果目标平台具有无等待功能
std::atomic<int>::fetch_sub
(希望 x64 上的所有标准库都是如此),则此代码现在可以选择一个线程以无等待方式运行 sync()
,并且所选线程开始运行立即sync()
。这应该比锁/栅栏好得多,
如果我没有弄乱那段无锁代码(那是一个很大的 IF)。
我的第二个建议是使用
std::counting_semaphore
让sync()
线程通知等待线程同步周期已经结束。 cppreference上很好地描述了用例:
信号量也经常用于信号/通知的语义,而不是互斥,通过用0初始化信号量,从而阻止尝试 acquire() 的接收者,直到通知者通过调用release“发出信号” (n)。在这方面,信号量可以被视为 std::condition_variables 的替代品,通常具有更好的性能。
通过建议的两个优化,选择运行的线程永远不需要获取锁,或等待某些屏障/条件变量。这是非常理想的:完成其
sync()
部分的速度越快,整个系统重新启动的速度就越快。最后,完整代码(godbolt)
在某些原子操作上当然可以使用比
#include <atomic>
#include <limits>
#include <semaphore>
#include <thread>
#include <vector>
using ThreadCount = int;
static constexpr auto maxThreadCount = std::numeric_limits<ThreadCount>::max();
int main() {
const ThreadCount nr_threads = 10;
std::vector<std::thread> threads;
struct SharedVars {
std::counting_semaphore<maxThreadCount> end_of_sync{ 0 };
std::atomic<bool> sync_required{ false };
std::atomic<ThreadCount> unpaused_count{ nr_threads };
};
SharedVars shared;
auto rare_condition = []() { return std::rand() == 42; };
for (ThreadCount i = 0; i < nr_threads; ++i) {
threads.emplace_back([&shared, rare_condition]() {
while (true) {
if (shared.sync_required) {
ThreadCount old_unpaused_count = shared.unpaused_count.fetch_sub(1);
if (old_unpaused_count == 1) {
// SYNC section here
shared.unpaused_count.store(nr_threads);
shared.sync_required.store(false);
shared.end_of_sync.release(nr_threads - 1);
} else {
shared.end_of_sync.acquire();
}
}
// standard loop body ...
if (rare_condition()) {
shared.sync_required = true;
}
}
});
}
for (auto& thread : threads) {
thread.join();
}
}
更弱的内存顺序。推理较弱的内存顺序远远超出了我的能力,所以我就这样离开了。