C++ 线程同步互斥锁

问题描述 投票:0回答:2

我是并行编程的新手,我正在尝试分解多线程上的迭代矩阵计算:每次迭代由两个顺序作业 A 和 B 组成;它需要等待所有线程完成作业 A,然后才能启动作业 B。

我使用互斥锁在所有线程之间进行同步,下面是一个示例脚本。当我多次运行脚本时,输出会随机变化,所以我猜我搞砸了线程同步。有人可以建议吗?谢谢!

static std::mutex mutex_A, mutex_B;
static std::condition_variable cv_A, cv_B;

int main()
{
    int Nthread=2;
    std::vector<std::thread> threads;
    threads.reserve(Nthread);
    for(int thread_id=0; thread_id < Nthread; thread_id++)
    {
        threads.emplace_back([](){
            for(int iter=0; iter<3000; iter++){
                
                // launch job A

                std::unique_lock<std::mutex> lock_A{mutex_A};
                cv_A.wait(lock_A, [=](){return true;});
                cv_A.notify_all();

                // launch job B

                std::unique_lock<std::mutex> lock_B{mutex_B};
                cv_B.wait(lock_B, [=](){return true;});
                cv_B.notify_all();
            }
        })
    }
    for(auto &thread: threads)
        thread.join();

    return 0;
}
c++ multithreading synchronization mutex
2个回答
0
投票

我认为问题在于您在不释放锁的情况下通知条件变量——导致线程无限期阻塞

static std::mutex mutex_A, mutex_B;
static std::condition_variable cv_A, cv_B;

int main()
{
    int Nthread = 2;
    std::vector<std::thread> threads;
    std::atomic<int> count_A(0);
    std::atomic<int> count_B(0);

    for(int thread_id = 0; thread_id < Nthread; thread_id++)
    {
        threads.emplace_back([&](){
            for(int iter = 0; iter < 3000; iter++){

                // launch job A
                {
                    std::unique_lock<std::mutex> lock_A(mutex_A);
                    // Wait for all threads to finish job A
                    if(++count_A == Nthread) {
                        count_A.store(0);
                        cv_A.notify_all();
                    } else {
                        cv_A.wait(lock_A, [&](){return count_A == 0;});
                    }
                }

                // launch job B
                {
                    std::unique_lock<std::mutex> lock_B(mutex_B);
                    // Wait for all threads to finish job B
                    if(++count_B == Nthread) {
                        count_B.store(0);
                        cv_B.notify_all();
                    } else {
                        cv_B.wait(lock_B, [&](){return count_B == 0;});
                    }
                }
            }
        });
    }

    for(auto &thread : threads)
        thread.join();

    return 0;
}
  • 使用了 2 个原子计数器(count_A + count_B)——分别跟踪有多少线程完成了作业 A|B

thread finishes job A|B -- 增加相应的计数器 + 检查它是否是最后一个完成的线程(最后一个线程 -- 重置计数器 + 通知所有等待相应条件变量的线程 -- else -- 等待条件变量直到所有线程已经完成)

  • lambda 函数——检查等待条件变量的条件(count_A == 0 | count_B == 0 确保所有线程都完成相应的工作)

希望在这些更改之后输出应该是确定性的


0
投票

不要尝试使用互斥量来协调不同线程的活动。它只会让你发疯。您应该使用互斥锁的唯一目的是防止不同的线程同时访问相同的共享数据。

如果您使用的是 C++20,那么您可以使用

std::barrier
来实现您的目标。

int main()
{
    int Nthread=2;
    std::barrier barrier(Nthread);
    std::vector<std::thread> threads;
    threads.reserve(Nthread);
    for(int thread_id=0; thread_id < Nthread; thread_id++)
    {
        threads.emplace_back([](){
            for(int iter=0; iter<3000; iter++){
                
                jobA();
                barrier.arrive_and_wait();

                // No thread can get _past_ the barrier until Nthread 
                // threads have arrived _at_ the barrier (I.E., not until
                // all of the threads have completed jobA()).
                //
                // Once Nthread threads have arrived, the threads all are 
                // released to go on to perform jobB(), and the barrier is
                // automatically reset, ready to be used again.

                jobB();
                barrier.arrive_and_wait();

                // No thread can get past the barrier until all threads have
                // completed jobB();
            }
        });
    }
    ...
}

如果您必须使用早于 C++20 的 C++ 版本,那么您的下一个最佳选择是使用

boost::barrier
。如果你做不到,那么也许可以找到
barrier
的开源实现,并将源代码合并到你的程序中。

© www.soinside.com 2019 - 2024. All rights reserved.