我有一个简单的小队列,其中一个任务从文件读取到队列中,然后几个任务解压缩内容。我工作了一段时间,但最终崩溃了,因为队列是空的,即使我在排队之前检查队列不空! (见代码中的注释)
#pragma once
#include <optional>
#include <future>
#include <queue>
template<class T, uint32_t MAX_SIZE>
class spmc_fifo_queue
{
public:
using optional_queue_pair = std::optional<T>;
bool put(optional_queue_pair&& p)
{
if (my_queue.size() >= MAX_SIZE) {
std::unique_lock my_lock(my_mutex);
my_cv_remove.wait(my_lock, [this] { return my_queue.size() < MAX_SIZE; });
}
std::lock_guard<std::mutex> my_lock(my_mutex);
my_queue.push(std::move(p));
my_cv_add.notify_one();
return true;
}
using optional_queue_pair = std::optional<T>;
optional_queue_pair get()
{
if (my_queue.size() == 0) {
std::unique_lock my_lock(my_mutex);
my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });
}
std::scoped_lock my_lock(my_mutex);
optional_queue_pair ret = std::move(my_queue.front()); //some times my_queue.size() == 0 why????
my_queue.pop();
my_cv_remove.notify_one();
return ret;
}
private:
using optional_queue_pair = std::optional<T>;
std::queue<optional_queue_pair> my_queue;
std::mutex my_mutex;
std::condition_variable my_cv_add;
std::condition_variable my_cv_remove;
};
我试图避免这种情况,但显然我不了解互斥锁的细节!
使用条件变量时可能会唤醒多个线程
notify_one
加锁队列后需要再次检查队列是否为空,因为可能有多个线程在等待锁,只有第一个会在队列中找到东西,其余的会发现队列为空,你还需要将整个事情放在
while (true)
中,因为队列不允许失败。
optional_queue_pair get()
{
while (true) // retry until succeed
{
if (my_queue.size() == 0) {
std::unique_lock my_lock(my_mutex);
my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });
}
std::scoped_lock my_lock(my_mutex); // multiple threads waiting on lock
if (my_queue.size() != 0) // check again after locking the queue
{
optional_queue_pair ret = std::move(my_queue.front());
my_queue.pop();
my_cv_remove.notify_one();
return ret;
}
}
}