我已经在 C++ 中实现了一个非常标准的单消费者-多生产者模式,此外还有队列中任务数量的限制。
Worker 在单独的线程上运行消息队列。任务从生产者发送给工人。如果队列中已经有
max_num_tasks_
,生产者必须等待。
使用工具集 v143 在 VS2022 for windows x64 中编译。
以下代码零星地挂在
Worker::Send()
语句cv_.wait(lock, [&] {return (mq.Size() < max_num_tasks_); });
处。有人看到我做错了什么吗?
工人的实施
#pragma once
#include <queue>
#include <mutex>
#include <functional>
class Worker {
public:
~Worker() {
Send([this] {done = true; });
thd.join();
}
Worker(size_t max_num_tasks) : max_num_tasks_(max_num_tasks), done(false), thd([this] {
while (!done) {
mq.PopFront()();
cv_.notify_all();
}
})
{ }
void Send(std::function<void()>&& m) {
{
std::unique_lock<std::mutex> lock(m_);
cv_.wait(lock, [&] {return (mq.Size() < max_num_tasks_); });
}
mq.PushBack(std::move(m));
}
private:
bool done;
size_t max_num_tasks_;
ThreadSafeQueue<std::function<void()>> mq;
std::thread thd;
std::mutex m_;
std::condition_variable cv_;
};
ThreadSafeQueue 的实现
#pragma once
#include <queue>
#include <mutex>
#include <functional>
template <typename T>
class ThreadSafeQueue {
public:
void PushBack(T&& val) {
{
std::unique_lock<std::mutex> lock(q_mutex);
q.push(std::move(val));
}
cv.notify_one();
}
void PushBack(const T& val) {
{
std::unique_lock<std::mutex> lock(q_mutex);
q.push(val);
}
cv.notify_one();
}
T PopFront() {
std::unique_lock<std::mutex> lock(q_mutex);
cv.wait(lock, [&] { return (!q.empty()); });
T v = q.front();
q.pop();
return std::move(v);
}
bool Empty() const {
std::unique_lock<std::mutex> lock(q_mutex);
return q.empty();
}
size_t Size() const {
std::unique_lock<std::mutex> lock(q_mutex);
return q.size();
}
private:
mutable std::mutex q_mutex;
std::condition_variable cv;
std::queue<T> q;
};
偶尔触发问题的单元测试示例:
TEST(WorkerTests, Compute_PI) {
std::atomic<double> value;
auto multiply_by_pi_over_four = [&value] {
int n = 750;
double v = 0;
for (int i = 0; i < n; i++) {
v += std::pow(-1, i) / (2 * i + 1);
}
value = value * v;
};
auto add_pi_over_four = [&value] {
int n = 750;
double v = 0;
for (int i = 0; i < n; i++) {
v += std::pow(-1, i) / (2 * i + 1);
}
value = value + v;
};
auto add_one = [&value] {
value = value + 1;
std::this_thread::sleep_for(std::chrono::milliseconds(3));
};
auto multiply_by_three = [&value] {
value = value * 3;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
};
for (int i = 0; i < 100; i++) {
value = 0.0;
{
Worker worker(1);
worker.Send(add_one);
worker.Send(multiply_by_pi_over_four);
worker.Send(multiply_by_three);
worker.Send([]() {});
worker.Send(add_pi_over_four);
}
EXPECT_GE(3.15, value.load());
EXPECT_LE(3.14, value.load());
}
}
cv_
s 同步在显示的代码中被破坏。
cv_.wait(lock, [&] {return (mq.Size() < max_num_tasks_); });
wait()
执行的事件顺序如下:
互斥锁的初始状态是锁定的
wait()
条件被检查。
如果条件为假,则互斥量自动解锁并等待条件变量。
步骤 3 是一个原子的、不可分割的操作,但是步骤 2 与步骤 3 不是不可分割的。这是一个单独的步骤。
所以:
一个。第 2 步发生。
return (mq.Size() < max_num_tasks_);
评估为 false.
乙。 Worker 线程被唤醒并快速遍历
mq
中的所有内容,每次都向条件变量发出信号,耗尽mq
直到它为空。
C。另一个线程从休眠中醒来,转到第 3 步,解锁互斥量并等待有人向条件变量发出信号。
D.没有任何东西会发出条件变量的信号。工作线程睡得很沉,而另一个执行线程反复按条件变量的喇叭,因为它很快排空了队列。
E。对
wait()
的调用现在等待条件变量发出信号,它永远不会。
基本上这个
wait()
需要使用same条件变量和互斥量,因为工作线程正在使用它来锁定它的线程。这种 wait()
队列低于其最大值的逻辑。大小,需要移入事件队列并使用相同的互斥锁和条件变量。
我的第二次尝试如下:
class Worker {
public:
~Worker() {
Send([this] {done = true; });
thd.join();
}
Worker(size_t max_num_tasks) : max_num_tasks_(max_num_tasks), done(false), thd([this] {
while (!done) {
{
std::unique_lock<std::mutex> lock(m_);
cv_.wait(lock, [&] { return (!mq.empty()); });
mq.front()();
mq.pop();
}
cv_.notify_all();
}
})
{ }
void Send(std::function<void()>&& m) {
{
{
std::unique_lock<std::mutex> lock(m_);
cv_.wait(lock, [&] {return (mq.size() < max_num_tasks_); });
mq.push(std::move(m));
}
cv_.notify_all();
}
}
private:
bool done;
size_t max_num_tasks_;
std::queue<std::function<void()>> mq;
std::thread thd;
std::mutex m_;
std::condition_variable cv_;
};
然而,从性能的角度来看,使用两个 cv 以避免在 Send() 中不必要地唤醒所有消费者线程似乎是有益的。但也许这在宏伟的计划中可以忽略不计......