多线程单生产者多消费者问题[关闭]

问题描述 投票:0回答:0

我的生产者和消费者实施使用互斥锁和条件变量时遇到问题。制作人看数字没问题,所以问题不在那里。

我正在使用 SomeQueue 类中的互斥锁来锁定推送、空检查以及等待和弹出方法。

问题是程序要么永远不会完成,要么完成但消耗的计数不等于生成的计数(生成的计数是正确的)。

想法是有一个生产者,它在队列中插入数字,另外还有多个消费者从队列中读取数据并对数字做一些事情(比如计算出现次数)。

我似乎无法在我的实施中找到问题所在。我感谢任何建议!

我目前的实现是这样的:

std::mutex m1;
std::condition_variable cond_var;
bool prodStarted = false;
bool done = false;

class SomeQueue {
private:
    std::queue<int> q;
public:
    void push(int value) {
        lock_guard<mutex> lock(m1);
        q.push(value);
        cond_var.notify_one();
    }

    std::shared_ptr<int> wait_and_pop() {
        std::unique_lock<std::mutex> lock(m1);
        cond_var.wait(lock, [this]{return !q.empty() || !done;});
        if(q.empty()) return nullptr;
        std::shared_ptr<int> res_ptr (std::make_shared<int>(q.front()));
        q.pop();

        return res_ptr;
    }

    bool empty() {
        std::lock_guard<std::mutex> lock(m1);
        return q.empty();
    }
};

这里是制作人:

int producer(std::string filename, SomeQueue &q) {
    int counter = 0;
    std::ifstream ifStream(filename);

    bool first = true;

    while (!ifStream.eof()) {
        prodStarted = true;
        int num;
        ifStream >> num;
        q.push(num);
        if(first) {
            lock_guard<mutex> lock(m1);
            prodStarted = true;
            first = false;
        }
    }

    {
      lock_guard<mutex> lock(m1);
      done = true;
      cond_prod.notify_all();
    }

    ifStream.close();

    return counter;
}

这是工作人员:

void worker(SomeQueue &q, std::vector<int> &number_counts) {
    while(prodStarted) {
        auto numPtr = q.wait_and_pop();
        // if the returned ptr is nullptr the queue is empty
        if(numPtr == nullptr) {
            break;
        }
        int num = *numPtr;
        // do something with the number...

        if(q.empty() && done) break;
    }
}

我的主要(截断):

SomeQueue q;
std::future<int> producer_future = async(launch::async, [&]() { return producer(filename, std::ref(q)); });

for (int i=0;i<num_threads - 1;++i) {
    workers.push_back(
        thread(
            worker, std::ref(q), std::ref(number_counts)
       )
    );
}
for (int i = 0; i < num_threads - 1; ++i) {
    workers[i].join();
}    
int produced_count = producer_future.get();
c++ multithreading parallel-processing thread-safety producer-consumer
© www.soinside.com 2019 - 2024. All rights reserved.