我正在开发的一个项目使用多个线程来处理文件集合。每个线程都可以将文件添加到要处理的文件列表中,因此我组合在一起(我认为是)一个线程安全队列。相关部分如下:
// qMutex is a std::mutex intended to guard the queue
// populatedNotifier is a std::condition_variable intended to
// notify waiting threads of a new item in the queue
void FileQueue::enqueue(std::string&& filename)
{
std::lock_guard<std::mutex> lock(qMutex);
q.push(std::move(filename));
// Notify anyone waiting for additional files that more have arrived
populatedNotifier.notify_one();
}
std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
std::unique_lock<std::mutex> lock(qMutex);
if (q.empty()) {
if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::no_timeout) {
std::string ret = q.front();
q.pop();
return ret;
}
else {
return std::string();
}
}
else {
std::string ret = q.front();
q.pop();
return ret;
}
}
但是,我偶尔会在
if (...wait_for(lock, timeout) == std::cv_status::no_timeout) { }
块内出现段错误,并且 gdb 中的检查表明发生段错误是因为队列为空。这怎么可能?据我了解,wait_for
仅在收到通知时返回cv_status::no_timeout
,并且这只应在FileQueue::enqueue
刚刚将新项目推送到队列之后发生。
最好使条件(由条件变量监控)成为 while 循环的逆条件:
while(!some_condition)
。在这个循环中,如果条件失败,您就会进入睡眠状态,从而触发循环体。
这样,如果您的线程被唤醒(可能是虚假的),您的循环仍会在继续之前检查条件。将“条件”视为感兴趣的状态,并将“条件变量”更多地视为来自系统的信号,表明该状态“可能”已准备好。该循环将完成实际确认其真实性的繁重工作,如果不是,则进入睡眠状态。
我刚刚编写了一个异步队列模板,希望这会有所帮助。这里,q.empty()
是我们想要的相反条件:队列中有东西。所以它作为 while 循环的检查。
#ifndef SAFE_QUEUE
#define SAFE_QUEUE
#include <queue>
#include <mutex>
#include <condition_variable>
// A threadsafe-queue.
template <class T>
class SafeQueue
{
public:
SafeQueue(void)
: q()
, m()
, c()
{}
~SafeQueue(void)
{}
// Add an element to the queue.
void enqueue(T t)
{
std::lock_guard<std::mutex> lock(m);
q.push(t);
c.notify_one();
}
// Get the "front"-element.
// If the queue is empty, wait till a element is avaiable.
T dequeue(void)
{
std::unique_lock<std::mutex> lock(m);
while(q.empty())
{
// release lock as long as the wait and reaquire it afterwards.
c.wait(lock);
}
T val = q.front();
q.pop();
return val;
}
private:
std::queue<T> q;
mutable std::mutex m;
std::condition_variable c;
};
#endif
根据标准
condition_variables
允许虚假唤醒,即使事件尚未发生。如果出现虚假唤醒,它将返回
cv_status::no_timeout
详细信息在标准§30.5.1 [thread.condition.condvar]中指定:
——当通过调用notify_one()、调用notify_all()、abs_time指定的绝对超时(30.2.4)到期或虚假发出信号时,该函数将解除阻塞。...
返回:如果abs_time指定的绝对超时(30.2.4)已过期,则cv_status::timeout,否则cv_status::no_timeout。
这可能是你应该这样做的:
void push(std::string&& filename) { { std::lock_guard<std::mutex> lock(qMutex); q.push(std::move(filename)); } populatedNotifier.notify_one(); } bool try_pop(std::string& filename, std::chrono::milliseconds timeout) { std::unique_lock<std::mutex> lock(qMutex); if(!populatedNotifier.wait_for(lock, timeout, [this] { return !q.empty(); })) return false; filename = std::move(q.front()); q.pop(); return true; }
除了已接受的答案之外,我想说实现正确的多生产者/多消费者队列很困难(尽管自 C++11 以来更容易)
我建议你尝试(非常好)无锁boost库
。
我现在添加这个答案是因为无锁库对于 boost 来说是相当新的(我相信是从 1.53 开始) 我会将你的出列函数重写为:
std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
std::unique_lock<std::mutex> lock(qMutex);
while(q.empty()) {
if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::timeout )
return std::string();
}
std::string ret = q.front();
q.pop();
return ret;
}
这是我在 C++20 中实现的线程队列:
#pragma once
#include <deque>
#include <mutex>
#include <condition_variable>
#include <utility>
#include <concepts>
#include <list>
template<typename QueueType>
concept thread_queue_concept =
std::same_as<QueueType, std::deque<typename QueueType::value_type, typename QueueType::allocator_type>>
|| std::same_as<QueueType, std::list<typename QueueType::value_type, typename QueueType::allocator_type>>;
template<typename QueueType>
requires thread_queue_concept<QueueType>
struct thread_queue
{
using value_type = typename QueueType::value_type;
thread_queue();
explicit thread_queue( typename QueueType::allocator_type const &alloc );
thread_queue( thread_queue &&other );
thread_queue &operator =( thread_queue const &other );
thread_queue &operator =( thread_queue &&other );
bool empty() const;
std::size_t size() const;
void shrink_to_fit();
void clear();
template<typename ... Args>
requires std::is_constructible_v<typename QueueType::value_type, Args ...>
void enque( Args &&... args );
template<typename Producer>
requires requires( Producer producer ) { { producer() } -> std::same_as<std::pair<bool, typename QueueType::value_type>>; }
void enqueue_multiple( Producer producer );
template<typename Consumer>
requires requires( Consumer consumer, typename QueueType::value_type value ) { { consumer( std::move( value ) ) } -> std::same_as<bool>; }
void dequeue_multiple( Consumer consumer );
typename QueueType::value_type dequeue();
void swap( thread_queue &other );
private:
mutable std::mutex m_mtx;
mutable std::condition_variable m_cv;
QueueType m_queue;
};
template<typename QueueType>
requires thread_queue_concept<QueueType>
thread_queue<QueueType>::thread_queue()
{
}
template<typename QueueType>
requires thread_queue_concept<QueueType>
thread_queue<QueueType>::thread_queue( typename QueueType::allocator_type const &alloc ) :
m_queue( alloc )
{
}
template<typename QueueType>
requires thread_queue_concept<QueueType>
thread_queue<QueueType>::thread_queue( thread_queue &&other )
{
using namespace std;
lock_guard lock( other.m_mtx );
m_queue = move( other.m_queue );
}
template<typename QueueType>
requires thread_queue_concept<QueueType>
thread_queue<QueueType> &thread_queue<QueueType>::thread_queue::operator =( thread_queue const &other )
{
std::lock_guard
ourLock( m_mtx ),
otherLock( other.m_mtx );
m_queue = other.m_queue;
return *this;
}
template<typename QueueType>
requires thread_queue_concept<QueueType>
thread_queue<QueueType> &thread_queue<QueueType>::thread_queue::operator =( thread_queue &&other )
{
using namespace std;
lock_guard
ourLock( m_mtx ),
otherLock( other.m_mtx );
m_queue = move( other.m_queue );
return *this;
}
template<typename QueueType>
requires thread_queue_concept<QueueType>
bool thread_queue<QueueType>::thread_queue::empty() const
{
std::lock_guard lock( m_mtx );
return m_queue.empty();
}
template<typename QueueType>
requires thread_queue_concept<QueueType>
std::size_t thread_queue<QueueType>::thread_queue::size() const
{
std::lock_guard lock( m_mtx );
return m_queue.size();
}
template<typename QueueType>
requires thread_queue_concept<QueueType>
void thread_queue<QueueType>::thread_queue::shrink_to_fit()
{
std::lock_guard lock( m_mtx );
return m_queue.shrink_to_fit();
}
template<typename QueueType>
requires thread_queue_concept<QueueType>
void thread_queue<QueueType>::thread_queue::clear()
{
std::lock_guard lock( m_mtx );
m_queue.clear();
}
template<typename QueueType>
requires thread_queue_concept<QueueType>
template<typename ... Args>
requires std::is_constructible_v<typename QueueType::value_type, Args ...>
void thread_queue<QueueType>::thread_queue::enque( Args &&... args )
{
using namespace std;
unique_lock lock( m_mtx );
m_queue.emplace_front( forward<Args>( args ) ... );
m_cv.notify_one();
}
template<typename QueueType>
requires thread_queue_concept<QueueType>
typename QueueType::value_type thread_queue<QueueType>::thread_queue::dequeue()
{
using namespace std;
unique_lock lock( m_mtx );
while( m_queue.empty() )
m_cv.wait( lock );
value_type value = move( m_queue.back() );
m_queue.pop_back();
return value;
}
template<typename QueueType>
requires thread_queue_concept<QueueType>
template<typename Producer>
requires requires( Producer producer ) { { producer() } -> std::same_as<std::pair<bool, typename QueueType::value_type>>; }
void thread_queue<QueueType>::enqueue_multiple( Producer producer )
{
using namespace std;
lock_guard lock( m_mtx );
for( std::pair<bool, value_type> ret; (ret = move( producer() )).first; )
m_queue.emplace_front( move( ret.second ) ),
m_cv.notify_one();
}
template<typename QueueType>
requires thread_queue_concept<QueueType>
template<typename Consumer>
requires requires( Consumer consumer, typename QueueType::value_type value ) { { consumer( std::move( value ) ) } -> std::same_as<bool>; }
void thread_queue<QueueType>::dequeue_multiple( Consumer consumer )
{
using namespace std;
unique_lock lock( m_mtx );
for( ; ; )
{
while( m_queue.empty() )
m_cv.wait( lock );
try
{
bool cont = consumer( move( m_queue.back() ) );
m_queue.pop_back();
if( !cont )
return;
}
catch( ... )
{
m_queue.pop_back();
throw;
}
}
}
template<typename QueueType>
requires thread_queue_concept<QueueType>
void thread_queue<QueueType>::thread_queue::swap( thread_queue &other )
{
std::lock_guard
ourLock( m_mtx ),
otherLock( other.m_mtx );
m_queue.swap( other.m_queue );
}
如果您有多个生产者和一个消费者, dequeue_multiple 通常是有意义的。这里我们也有更长的锁定周期,但由于这里的物体通常只有快速移动,这通常不会造成伤害。
如果 dequeue_multiple 的消费者函数对象在消费时抛出异常,则会捕获该异常,并删除提供给消费者的元素(底层队列类型对象内的右值引用)。如果您想在 C++11 中使用此类,则必须删除这些概念或使用 #if Defined(__cpp_concepts) 禁用它们。
这是一个类似的线程队列,它有一个布尔参数,决定是否使用 std::list<> 或 std::deque<> 。该列表可能会节省一些内存,但速度较慢。
#include <deque>
#include <mutex>
#include <condition_variable>
#include <utility>
#include <concepts>
#include <deque>
#include <list>
#include <chrono>
#include <concepts>
#include <array>
#include <algorithm>
template<typename Entity, bool List = false, typename Allocator = std::allocator<Entity>>
struct thread_queue
{
using entity_t = Entity;
static constexpr bool uses_list = List;
using allocator_t = Allocator;
thread_queue();
explicit thread_queue( Allocator const &alloc );
thread_queue( thread_queue const & ) = delete;
thread_queue( thread_queue &&other );
thread_queue &operator =( thread_queue const & ) = delete;
thread_queue &operator =( thread_queue &&other );
bool empty() const;
size_t size() const;
void shrink_to_fit();
void clear();
template<typename ... Args>
requires std::is_constructible_v<Entity, Args ...>
void enque( Args &&... args );
template<typename Producer>
requires requires( Producer producer ) { { producer() } -> std::same_as<std::pair<bool, Entity>>; }
void enqueue_multiple( Producer producer );
template<std::forward_iterator ForwardIt>
requires std::convertible_to<std::iter_value_t<ForwardIt>, Entity>
void enqueue_multiple( ForwardIt begin, ForwardIt end );
template<typename Consumer>
requires requires( Consumer consumer, Entity value ) { { consumer( std::move( value ) ) } -> std::same_as<bool>; }
bool dequeue_multiple( Consumer consumer, std::chrono::milliseconds timeout = std::chrono::milliseconds::min() );
template<std::forward_iterator ForwardIt>
requires std::convertible_to<std::iter_value_t<ForwardIt>, Entity>
ForwardIt dequeue_multiple( ForwardIt begin, ForwardIt end, std::chrono::milliseconds timeout = std::chrono::milliseconds::min() );
std::pair<bool, Entity> dequeue( std::chrono::milliseconds timeout = std::chrono::milliseconds( -1 ) );
using queue_t = std::conditional_t<!List, std::deque<Entity, Allocator>, std::list<Entity, Allocator>>;
queue_t container();
private:
mutable std::mutex m_mtx;
mutable std::condition_variable m_cv;
queue_t m_queue;
};
template<typename Entity, bool List, typename Allocator>
thread_queue<Entity, List, Allocator>::thread_queue()
{
}
template<typename Entity, bool List, typename Allocator>
thread_queue<Entity, List, Allocator>::thread_queue( Allocator const &alloc ) :
m_queue( alloc )
{
}
template<typename Entity, bool List, typename Allocator>
thread_queue<Entity, List, Allocator>::thread_queue( thread_queue &&other )
{
std::lock_guard lock( m_mtx );
m_queue = move( other.m_queue );
}
template<typename Entity, bool List, typename Allocator>
thread_queue<Entity, List, Allocator> &thread_queue<Entity, List, Allocator>::thread_queue::operator =( thread_queue &&other )
{
using namespace std;
lock_guard
ourlock( m_mtx ),
otherLock( other.m_mon );
m_queue = move( other.m_queue );
return *this;
}
template<typename Entity, bool List, typename Allocator>
bool thread_queue<Entity, List, Allocator>::thread_queue::empty() const
{
std::lock_guard lock( m_mtx );
return m_queue.empty();
}
template<typename Entity, bool List, typename Allocator>
size_t thread_queue<Entity, List, Allocator>::thread_queue::size() const
{
std::lock_guard lock( m_mtx );
return m_queue.size();
}
template<typename Entity, bool List, typename Allocator>
void thread_queue<Entity, List, Allocator>::thread_queue::shrink_to_fit()
{
std::lock_guard lock( m_mtx );
return m_queue.shrink_to_fit();
}
template<typename Entity, bool List, typename Allocator>
void thread_queue<Entity, List, Allocator>::thread_queue::clear()
{
std::lock_guard lock( m_mtx );
m_queue.clear();
}
template<typename Entity, bool List, typename Allocator>
template<typename ... Args>
requires std::is_constructible_v<Entity, Args ...>
void thread_queue<Entity, List, Allocator>::thread_queue::enque( Args &&... args )
{
using namespace std;
lock_guard lock( m_mtx );
m_queue.emplace_front( forward<Args>( args ) ... );
m_cv.notify_one();
}
template<typename Entity, bool List, typename Allocator>
template<std::forward_iterator ForwardIt>
requires std::convertible_to<std::iter_value_t<ForwardIt>, Entity>
void thread_queue<Entity, List, Allocator>::enqueue_multiple( ForwardIt begin, ForwardIt end )
{
using namespace std;
lock_guard lock( m_mtx );
for( ; begin != end; ++begin )
m_queue.emplace_back( move( *begin ) );
}
template<typename Entity, bool List, typename Allocator>
template<typename Producer>
requires requires( Producer producer ) { { producer() } -> std::same_as<std::pair<bool, Entity>>; }
void thread_queue<Entity, List, Allocator>::enqueue_multiple( Producer producer )
{
using namespace std;
lock_guard lock( m_mtx );
for( pair<bool, Entity> ret; (ret = move( producer() )).first; )
m_queue.emplace_front( move( ret.second ) ),
m_cv.notify_one();
}
template<typename Entity, bool List, typename Allocator>
std::pair<bool, Entity> thread_queue<Entity, List, Allocator>::thread_queue::dequeue( std::chrono::milliseconds timeout )
{
using namespace std;
unique_lock lock( m_mtx );
while( m_queue.empty() )
if( timeout.count() < 0 )
m_cv.wait( lock );
else
if( m_cv.wait_for( lock, timeout ) != cv_status::no_timeout )
return { false, {} };
Entity value = move( m_queue.back() );
m_queue.pop_back();
return { true, move( value ) };
}
template<typename Entity, bool List, typename Allocator>
template<typename Consumer>
requires requires( Consumer consumer, Entity value ) { { consumer( std::move( value ) ) } -> std::same_as<bool>; }
bool thread_queue<Entity, List, Allocator>::dequeue_multiple( Consumer consumer, std::chrono::milliseconds timeout )
{
using namespace std;
unique_lock lock( m_mtx );
size_t size;
while( !(size = m_queue.size()) )
if( timeout.count() < 0 )
m_cv.wait( lock );
else
if( m_cv.wait_for( lock, timeout ) != cv_status::no_timeout )
return false;
do
{
bool next = consumer( std::move( m_queue.back() ) );
m_queue.pop_back();
if( !next )
return true;
} while( --size );
return true;
}
template<typename Entity, bool List, typename Allocator>
template<std::forward_iterator ForwardIt>
requires std::convertible_to<std::iter_value_t<ForwardIt>, Entity>
ForwardIt thread_queue<Entity, List, Allocator>::dequeue_multiple( ForwardIt begin, ForwardIt end, std::chrono::milliseconds timeout )
{
if( begin != end )
dequeue_multiple( [&]( Entity &&value )
{
*begin++ = std::move( value );
return begin != end;
}, timeout );
return begin;
}
template<typename Entity, bool List, typename Allocator>
typename thread_queue<Entity, List, Allocator>::queue_t thread_queue<Entity, List, Allocator>::container()
{
using namespace std;
lock_guard lock( m_mtx );
return move( m_queue );
}