我有许多线程生成我想要处理的小块数据。重要的是,我不要过多延迟这些线程,因此处理应该在另一个线程上进行(单个工作线程就足够了,并且 >90% 的时间它可能没有任何事情可做)。
在等待互斥体时挂起数据线程对我来说延迟太大,但我可以忍受工作线程上的延迟。因此,我选择采用以下解决方案:
我使用无锁 MPSC 队列,将数据推送到数据线程中。 工作线程处理所有工作包,然后使用条件变量进入睡眠状态。当新数据可用时,数据线程通知条件变量。
代码大致如下:
std::mutex mutex_;
std::condition_variable cv_;
MpscJobQueue queue_;
// worker thread
while (true) {
while (!queue_.empty()) {
process(queue_.pop());
}
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock);
}
// data thread
queue_.push(myData);
cv_.notify_one();
这在大多数情况下都工作得很好,但有一个有问题的极端情况。当不再创建数据包时,一个(甚至可能是几个)数据包可能会保留在队列中,由于唤醒丢失而永远不会被处理。这可以通过以下方式发生:
cv_.wait(lock)
进入睡眠状态。我知道我可以通过在通知条件变量之前获取数据线程中的锁来解决丢失唤醒的问题。但这可能会迫使数据线程休眠,这是我需要避免的。
另一种方法是在工作线程中使用
cv_.wait_for(lock, 1s);
的东西。这是我目前正在使用的解决方案,因为数据包不会保持未处理状态。如果没有更好的方法我可以接受这个解决方案,但感觉不太好。理想情况下,我希望避免时不时出现 1 秒的延迟。
我有什么遗漏的吗?有没有更好的方法可以避免丢失唤醒,而无需获取数据线程的锁?
也许将自旋锁与
std::condition_variable_any
一起使用可能是另一种解决方案?工作线程和数据线程中可能只有非常少量的忙等待,因此听起来这也可以接受。
如果不使用互斥体,就很难逃脱,但是您当然可以在互斥体之外进行所有处理。我会编写如下代码,但这只是一个建议。
std::mutex mutex_;
std::condition_variable cv_;
MpscJobQueue queue_;
bool waiting = false;
// worker thread
while (true) {
while (!queue_.empty()) {
process(queue_.pop());
}
std::unique_lock<std::mutex> lock(mutex_);
if (!queue_.empty())
continue;
waiting = true;
cv_.wait(lock);
}
// data thread
bool was_waiting;
{
std::unique_lock<std::mutex> lock(mutex_);
was_waiting = waiting;
waiting = false;
queue_.push(myData);
}
if (was_waiting)
cv_.notify_one();
我知道我可以通过在通知条件变量之前获取数据线程中的锁来解决唤醒丢失的问题。
是的。这就是条件变量的使用方式。
您不允许“生产者”线程更改“消费者”线程正在等待的状态(例如,您不允许它将队列的状态从空更改为非空 )除非锁已锁定。
消费者线程不允许
wait
,直到它已经锁定了锁,最后,
wait
实现保证暂时解锁锁,以便生产者可以锁定它并生产一些东西,但是它不会解锁锁,直到它准备好被通知唤醒。
这样,通知就永远不会丢失。
也许将自旋锁与 std::condition_variable_any 一起使用可能是另一种解决方案?
自旋锁仍然是一个锁。你还是有同样的问题。
IMO,你可以做的一件非常棘手的事情*是调用
cv_.wait_for(lock, timeout)
来获取一些适当小的 timeout
值,而不是仅仅调用 cv_.wait(lock)
。
您说过,您的版本在大多数情况下都有效。因此,如果
timeout
具有适当的值,则 wait_for(lock, timeout)
在大多数情况下可以与您的版本执行相同的操作。只是,每隔一段时间,当您的版本永远挂起时,timeout
可以拯救您。
* 如果没有测量性能,我不会将其作为生产软件发布。除非测得的性能比按预期使用方式使用 condition_variable
的版本