我的生产者和消费者实施使用互斥锁和条件变量时遇到问题。制作人看数字没问题,所以问题不在那里。
我正在使用 SomeQueue 类中的互斥锁来锁定推送、空检查以及等待和弹出方法。
问题是程序要么永远不会完成,要么完成但消耗的计数不等于生成的计数(生成的计数是正确的)。
想法是有一个生产者,它在队列中插入数字,另外还有多个消费者从队列中读取数据并对数字做一些事情(比如计算出现次数)。
我似乎无法在我的实施中找到问题所在。我感谢任何建议!
我目前的实现是这样的:
std::mutex m1;
std::condition_variable cond_var;
bool prodStarted = false;
bool done = false;
class SomeQueue {
private:
std::queue<int> q;
public:
void push(int value) {
lock_guard<mutex> lock(m1);
q.push(value);
cond_var.notify_one();
}
std::shared_ptr<int> wait_and_pop() {
std::unique_lock<std::mutex> lock(m1);
cond_var.wait(lock, [this]{return !q.empty() || !done;});
if(q.empty()) return nullptr;
std::shared_ptr<int> res_ptr (std::make_shared<int>(q.front()));
q.pop();
return res_ptr;
}
bool empty() {
std::lock_guard<std::mutex> lock(m1);
return q.empty();
}
};
这里是制作人:
int producer(std::string filename, SomeQueue &q) {
int counter = 0;
std::ifstream ifStream(filename);
bool first = true;
while (!ifStream.eof()) {
prodStarted = true;
int num;
ifStream >> num;
q.push(num);
if(first) {
lock_guard<mutex> lock(m1);
prodStarted = true;
first = false;
}
}
{
lock_guard<mutex> lock(m1);
done = true;
cond_prod.notify_all();
}
ifStream.close();
return counter;
}
这是工作人员:
void worker(SomeQueue &q, std::vector<int> &number_counts) {
while(prodStarted) {
auto numPtr = q.wait_and_pop();
// if the returned ptr is nullptr the queue is empty
if(numPtr == nullptr) {
break;
}
int num = *numPtr;
// do something with the number...
if(q.empty() && done) break;
}
}
我的主要(截断):
SomeQueue q;
std::future<int> producer_future = async(launch::async, [&]() { return producer(filename, std::ref(q)); });
for (int i=0;i<num_threads - 1;++i) {
workers.push_back(
thread(
worker, std::ref(q), std::ref(number_counts)
)
);
}
for (int i = 0; i < num_threads - 1; ++i) {
workers[i].join();
}
int produced_count = producer_future.get();