我使用
std::thread
编写了一个多线程 C++ 程序,如下所示:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
using namespace std;
const int threadNum = 4;
mutex mt[threadNum];
condition_variable cv[threadNum];
thread threadList[threadNum];
bool threadWork[threadNum];
void work(int id) {
while (true) {
unique_lock<mutex> lck(mt[id]);
cv[id].wait(lck, [&]() { return threadWork[id]; }); // wait for incoming tasks
// do something
threadWork[id] = false;
cv[id].notify_all();
}
}
int main() {
for (int i = 0; i < threadNum; i ++) {
threadWork[i] = false;
threadList[i] = thread(work, i);
}
while (true) {
for (int i = 0; i < threadNum; i ++) {
// allocate tasks for each threads
threadWork[i] = true;
cv[i].notify_all();
}
for (int i = 0; i < threadNum; i ++) {
// wait until all tasks finish
unique_lock<mutex> lck(mt[i]);
cv[i].wait(lck, [&]() { return !threadWork[i]; });
cout << "Finish thread " << i << endl;
}
// do something
}
for (int i = 0; i < threadNum; i ++)
threadList[i].join();
return 0;
}
程序迭代时,主线程和子线程会交替执行。主线程首先会为多个子线程分配任务。当所有线程完成任务后,主线程将聚合信息并进行一些其他计算。
但是,在执行过程中,程序会随机迭代几次后崩溃或陷入死锁。我不知道为什么会发生这种情况。有什么问题吗?
我的环境:ubuntu 22.04,g++ 11.4
下面代码中的注释:
//...
mutex mt[threadNum]; // Mutexes are for synchronizing
// between threads. Having 1 mutex
// per thread is not proper the way
// to guard access to data.
// try having 1 mutex per block of
// data that you want guarded instead.
condition_variable cv[threadNum]; // same applies for the condition variables
// you want to indicate that the DATA
// is ready.
// ...
// The rest of the code will need a bit of reordering.
您应该尝试为您的实验做一些更简单的事情,比方说,一开始只有一个受保护的数据块。下面是一个例子。
请注意下面的代码,工作线程等待一个唯一的条件变量,告诉它们需要完成某些操作。
我还添加了一个简单的机制,可以在工作完成后优雅地退出程序。这与其他内容一样重要,请注意atomic<>的使用,它保证所有核心都会在值发生变化后立即看到它。
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <atomic>
std::mutex my_data_mutex;
std::condition_variable my_data_cv;
std::vector<int> my_data;
constexpr size_t threadNum = 4;
std::atomic<bool> halt = false;
void work(int id) {
for(;;)
{
std::unique_lock<std::mutex> _(my_data_mutex);
my_data_cv.wait(_); // wait for incoming tasks
if (halt)
return;
// do something
my_data.push_back(my_data.back() + id);
}
}
int main() {
std::vector<std::thread> tasks;
for (int i = 0; i < threadNum; i++) {
tasks.emplace_back(work, i);
}
while(!halt)
{
std::unique_lock<std::mutex> _(my_data_mutex);
my_data.push_back(0); // do some silly job
if (my_data.size() > 1'000'000) // exit when job is done.
halt = true; // tell all that the jobn is done
my_data_cv.notify_all(); // tell all threads to do their silly part
}
for (auto& t : tasks)
t.join();
return 0;
}