锁定单个生产者、多个消费者 fifo 队列的问题

问题描述 投票:0回答:1

我有一个简单的小队列,其中一个任务从文件读取到队列中,然后几个任务解压缩内容。我工作了一段时间,但最终崩溃了,因为队列是空的,即使我在排队之前检查队列不空! (见代码中的注释)

#pragma once
#include <optional>
#include <future>
#include <queue>

template<class T, uint32_t MAX_SIZE>
class spmc_fifo_queue
{
public:
    using optional_queue_pair = std::optional<T>;
    bool put(optional_queue_pair&& p)
    {
        if (my_queue.size() >= MAX_SIZE) {
            std::unique_lock my_lock(my_mutex);
            my_cv_remove.wait(my_lock, [this] { return my_queue.size() < MAX_SIZE; });
        }
        std::lock_guard<std::mutex> my_lock(my_mutex);
        my_queue.push(std::move(p));
        my_cv_add.notify_one();
        return true;
    }
    using optional_queue_pair = std::optional<T>;
    optional_queue_pair get()
    {
        if (my_queue.size() == 0) {
            std::unique_lock my_lock(my_mutex);
            my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });
        }
        std::scoped_lock my_lock(my_mutex);
        optional_queue_pair ret = std::move(my_queue.front()); //some times my_queue.size() == 0 why????
        my_queue.pop();
        my_cv_remove.notify_one();
        return ret;
    }
private:
    using optional_queue_pair = std::optional<T>;
    std::queue<optional_queue_pair> my_queue;
    std::mutex my_mutex;
    std::condition_variable my_cv_add;
    std::condition_variable my_cv_remove;
};

我试图避免这种情况,但显然我不了解互斥锁的细节!

c++ queue locking mutex multitasking
1个回答
0
投票

使用条件变量时可能会唤醒多个线程

notify_one

加锁队列后需要再次检查队列是否为空,因为可能有多个线程在等待锁,只有第一个会在队列中找到东西,其余的会发现队列为空,你还需要将整个事情放在

while (true)
中,因为队列不允许失败。

optional_queue_pair get()
{
    while (true)  // retry until succeed
    {
        if (my_queue.size() == 0) {
            std::unique_lock my_lock(my_mutex);
            my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });
        }
        std::scoped_lock my_lock(my_mutex);  // multiple threads waiting on lock
        if (my_queue.size() != 0) // check again after locking the queue
        {
            optional_queue_pair ret = std::move(my_queue.front());
            my_queue.pop();
            my_cv_remove.notify_one();
            return ret;
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.