多线程单生产者多消费者错误

问题描述 投票:0回答:0

我的生产者和消费者实施使用互斥锁和条件变量时遇到问题。

问题是程序有时永远不会完成,有时会完成,但消耗的计数不等于生成的计数。

想法是有一个生产者,它在队列中插入数字,另外还有多个消费者从队列中读取数据并对数字做一些事情(比如计算出现次数)。

我似乎无法在我的实施中找到问题所在。我感谢任何建议!

我目前的实现是这样的:

std::mutex m1;
std::condition_variable cond_var;
bool prodStarted = false;
bool done = false;

class SomeQueue {
private:
    std::queue<int> q;
public:
    void push(int value) {
        lock_guard<mutex> lock(m1);
        q.push(value);
        cond_var.notify_one();
    }

    std::shared_ptr<int> wait_and_pop() {
        std::unique_lock<std::mutex> lock(m1);
        cond_var.wait(lock, [this]{return !q.empty() || !done;});
        if(q.empty()) return nullptr;
        std::shared_ptr<int> res_ptr (std::make_shared<int>(q.front()));
        q.pop();

        return res_ptr;
    }

    bool empty() {
        std::lock_guard<std::mutex> lock(m1);
        return q.empty();
    }
};

这里是制作人:

int producer(std::string filename, SomeQueue &q) {
    int counter = 0;
    std::ifstream ifStream(filename);

    bool first = true;

    while (!ifStream.eof()) {
        prodStarted = true;
        int num;
        ifStream >> num;
        q.push(num);
        if(first) {
            lock_guard<mutex> lock(m1);
            prodStarted = true;
            first = false;
        }
    }

    {
      lock_guard<mutex> lock(m1);
      done = true;
      cond_prod.notify_all();
    }

    ifStream.close();

    return counter;
}

这是工作人员:

void worker(SomeQueue &q, std::vector<int> &number_counts) {
    while(prodStarted) {
        auto numPtr = q.wait_and_pop();
        // if the returned ptr is nullptr the queue is empty
        if(numPtr == nullptr) {
            break;
        }
        int num = *numPtr;
        // do something with the number...

        if(q.empty() && done) break;
    }
}

我的主要(截断):

SomeQueue q;
std::future<int> producer_future = async(launch::async, [&]() { return producer(filename, std::ref(q)); });

    for (int i=0;i<num_threads - 1;++i) {
        workers.push_back(
            thread(
                worker, std::ref(q), std::ref(number_counts)
            )
        );
    }

    for (int i = 0; i < num_threads - 1; ++i) {
        workers[i].join();
    }
    
    int produced_count = producer_future.get();
c++ multithreading thread-safety mutex conditional-variable
© www.soinside.com 2019 - 2024. All rights reserved.