暂停线程以进行同步操作

问题描述 投票:0回答:1

我遇到过这样的情况:N 个线程以小的增量步骤同时处理数据结构。然而,有时需要进行同步操作。

因此所有线程都需要暂停,等待其中一个线程执行此操作,然后继续。我正在寻找一种在不需要同步操作时对线程影响尽可能小的方法。

一个简单的选择是使用

shared_mutex
,但我认为开销较低的选项是可能的。我尝试使用下面的屏障和原子来尝试自己的解决方案。

所以我的问题是: 这是解决问题的有效方法吗? 有更好的解决办法吗?

#include <vector>
#include <thread>
#include <atomic>
#include <barrier>

int main()
{
    const size_t nr_threads = 10;
    std::vector<std::thread> threads;

    std::barrier barrier { nr_threads };
    std::atomic_bool sync_required { false };

    auto rare_condition = []() { return std::rand() == 42; };

    for (int i = 0; i < nr_threads; ++i)
    {
        threads.emplace_back([&, i]()
        {
            while (true)
            {
                if (sync_required)
                {
                    if (i == 0)
                    {
                        barrier.arrive_and_wait();
                        sync_required = false;
                        // solo synchronized work
                        barrier.arrive_and_wait();
                    }
                    else
                    {
                        barrier.arrive_and_wait();
                        barrier.arrive_and_wait();
                    }
                }

                // standard loop body ...

                // sometimes a global synchronized action is required
                if (rare_condition()) sync_required = true;
            }
        });
    }

    // eventually ... treads quit

    for (auto& thread : threads) 
    {
        thread.join();
    }
}

使用shared_mutex的另一种解决方案需要条件变量?

#include <array>
#include <atomic>
#include <thread>
#include <vector>
#include <barrier>
#include <shared_mutex>

int main()
{
    const size_t nr_threads = 10;
    std::vector<std::thread> threads;

    std::shared_mutex sync_mtx;
    std::atomic_bool sync_required { false };

    auto rare_condition = []() { return std::rand() == 42; };

    for (int i = 0; i < nr_threads; ++i)
    {
        threads.emplace_back([&, i]()
        {
            std::shared_lock shared_lock { sync_mtx };

            while (true)
            {
                // very rarely another thread requires all the others to stop for a bit
                if (sync_required)
                {
                    if (i == 0)
                    {
                        // unlock shared, but lock unique, seems a little odd but neccesary
                        shared_lock.unlock();
                        {
                            std::unique_lock unique_lock{ shared_lock };
                            sync_required = false;
                            // solo sync work
                        }
                        shared_lock.lock();
                    }
                    else
                    {
                        shared_lock.unlock();
                        // todo: need condition variable, which adds more complexity to this solution?
                        shared_lock.lock();
                    }
                }

                // sometimes a global syncronized action is required
                sync_required = sync_required || rare_condition();
            }
        });
    }

    // eventually ... treads quit

    for (auto& thread : threads) 
    {
        thread.join();
    }
}
c++ multithreading atomic barrier
1个回答
0
投票

我对您的代码提出的主要优化是替换以下内容

// Thread: if (sync_required) {
if (i == 0) { // An arbitrary thread is choosen to run sync().
    // It has to wait until other threads aknowledges having seen sync_required
    barrier.arrive_and_wait();
    sync();

通过这样的事情

// In main:
std::atomic<int> unpaused_count{ nr_threads }; // number of threads that haven't noticed sync_required

// Thread: if (sync_required) {
int old_unpaused_count = unpaused_count.fetch_sub(1); // This thread just noticed sync_required, update the count
if (old_unpaused_count == 1) { // The thread noticing last is choosen to run sync()
    // It can run sync() immediately, since it knows that all other
    // threads have seen sync_required, and have decided to wait.
    sync();
    // TODO: restore unpaused_count & resume other threads.
} else { // This thread isn't the last to notice: wait.

如果目标平台具有无等待功能

std::atomic<int>::fetch_sub
(希望 x64 上的所有标准库都是如此),则此代码现在可以选择一个线程以无等待方式运行
sync()
,并且所选线程开始运行立即
sync()
。这应该比锁/栅栏好得多, 如果我没有弄乱那段无锁代码(那是一个很大的 IF)。

我的第二个建议是使用

std::counting_semaphore
sync()
线程通知等待线程同步周期已经结束。 cppreference上很好地描述了用例:

信号量也经常用于信号/通知的语义,而不是互斥,通过用0初始化信号量,从而阻止尝试 acquire() 的接收者,直到通知者通过调用release“发出信号” (n)。在这方面,信号量可以被视为 std::condition_variables 的替代品,通常具有更好的性能。

通过建议的两个优化,选择运行的线程永远不需要获取锁,或等待某些屏障/条件变量。这是非常理想的:完成其

sync()
部分的速度越快,整个系统重新启动的速度就越快。
最后,

完整代码(godbolt)

sync()

在某些原子操作上当然可以使用比 
#include <atomic> #include <limits> #include <semaphore> #include <thread> #include <vector> using ThreadCount = int; static constexpr auto maxThreadCount = std::numeric_limits<ThreadCount>::max(); int main() { const ThreadCount nr_threads = 10; std::vector<std::thread> threads; struct SharedVars { std::counting_semaphore<maxThreadCount> end_of_sync{ 0 }; std::atomic<bool> sync_required{ false }; std::atomic<ThreadCount> unpaused_count{ nr_threads }; }; SharedVars shared; auto rare_condition = []() { return std::rand() == 42; }; for (ThreadCount i = 0; i < nr_threads; ++i) { threads.emplace_back([&shared, rare_condition]() { while (true) { if (shared.sync_required) { ThreadCount old_unpaused_count = shared.unpaused_count.fetch_sub(1); if (old_unpaused_count == 1) { // SYNC section here shared.unpaused_count.store(nr_threads); shared.sync_required.store(false); shared.end_of_sync.release(nr_threads - 1); } else { shared.end_of_sync.acquire(); } } // standard loop body ... if (rare_condition()) { shared.sync_required = true; } } }); } for (auto& thread : threads) { thread.join(); } }

更弱的内存顺序。推理较弱的内存顺序远远超出了我的能力,所以我就这样离开了。

    

© www.soinside.com 2019 - 2024. All rights reserved.