C++11线程安全队列

问题描述 投票:0回答:7

我正在开发的一个项目使用多个线程来处理文件集合。每个线程都可以将文件添加到要处理的文件列表中,因此我组合在一起(我认为是)一个线程安全队列。相关部分如下:

// qMutex is a std::mutex intended to guard the queue
// populatedNotifier is a std::condition_variable intended to
//                   notify waiting threads of a new item in the queue

void FileQueue::enqueue(std::string&& filename)
{
    std::lock_guard<std::mutex> lock(qMutex);
    q.push(std::move(filename));

    // Notify anyone waiting for additional files that more have arrived
    populatedNotifier.notify_one();
}

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);
    if (q.empty()) {
        if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::no_timeout) {
            std::string ret = q.front();
            q.pop();
            return ret;
        }
        else {
            return std::string();
        }
    }
    else {
        std::string ret = q.front();
        q.pop();
        return ret;
    }
}

但是,我偶尔会在

if (...wait_for(lock, timeout) == std::cv_status::no_timeout) { }
块内出现段错误,并且 gdb 中的检查表明发生段错误是因为队列为空。这怎么可能?据我了解,
wait_for
仅在收到通知时返回
cv_status::no_timeout
,并且这只应在
FileQueue::enqueue
刚刚将新项目推送到队列之后发生。

c++ multithreading c++11 queue condition-variable
7个回答
88
投票

最好使条件(由条件变量监控)成为 while 循环的逆条件:

while(!some_condition)
。在这个循环中,如果条件失败,您就会进入睡眠状态,从而触发循环体。

这样,如果您的线程被唤醒(可能是虚假的),您的循环仍会在继续之前检查条件。将“条件”视为感兴趣的状态,并将“条件变量”更多地视为来自系统的信号,表明该状态“可能”已准备好。该循环将完成实际确认其真实性的繁重工作,如果不是,则进入睡眠状态。 我刚刚编写了一个异步队列模板,希望这会有所帮助。这里,q.empty()是我们想要的相反条件:队列中有东西。所以它作为 while 循环的检查。 #ifndef SAFE_QUEUE #define SAFE_QUEUE #include <queue> #include <mutex> #include <condition_variable> // A threadsafe-queue. template <class T> class SafeQueue { public: SafeQueue(void) : q() , m() , c() {} ~SafeQueue(void) {} // Add an element to the queue. void enqueue(T t) { std::lock_guard<std::mutex> lock(m); q.push(t); c.notify_one(); } // Get the "front"-element. // If the queue is empty, wait till a element is avaiable. T dequeue(void) { std::unique_lock<std::mutex> lock(m); while(q.empty()) { // release lock as long as the wait and reaquire it afterwards. c.wait(lock); } T val = q.front(); q.pop(); return val; } private: std::queue<T> q; mutable std::mutex m; std::condition_variable c; }; #endif

根据标准

condition_variables
允许虚假唤醒,即使事件尚未发生。如果出现虚假唤醒,它将返回 
cv_status::no_timeout

37
投票

详细信息在标准§30.5.1 [thread.condition.condvar]中指定:


——当通过调用notify_one()、调用notify_all()、abs_time指定的绝对超时(30.2.4)到期或虚假发出信号时,该函数将解除阻塞。

...

返回:

如果abs_time指定的绝对超时(30.2.4)已过期,则cv_status::timeout,否则cv_status::no_timeout。

这可能是你应该这样做的:

void push(std::string&& filename) { { std::lock_guard<std::mutex> lock(qMutex); q.push(std::move(filename)); } populatedNotifier.notify_one(); } bool try_pop(std::string& filename, std::chrono::milliseconds timeout) { std::unique_lock<std::mutex> lock(qMutex); if(!populatedNotifier.wait_for(lock, timeout, [this] { return !q.empty(); })) return false; filename = std::move(q.front()); q.pop(); return true; }

20
投票

除了已接受的答案之外,我想说实现正确的多生产者/多消费者队列很困难(尽管自 C++11 以来更容易)

我建议你尝试(非常好)
无锁boost库

13
投票
无需C ++ 11编译器

我现在添加这个答案是因为无锁库对于 boost 来说是相当新的(我相信是从 1.53 开始) 我会将你的出列函数重写为:

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout) { std::unique_lock<std::mutex> lock(qMutex); while(q.empty()) { if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::timeout ) return std::string(); } std::string ret = q.front(); q.pop(); return ret; }


5
投票

这是我在 C++20 中实现的线程队列:

#pragma once #include <deque> #include <mutex> #include <condition_variable> #include <utility> #include <concepts> #include <list> template<typename QueueType> concept thread_queue_concept = std::same_as<QueueType, std::deque<typename QueueType::value_type, typename QueueType::allocator_type>> || std::same_as<QueueType, std::list<typename QueueType::value_type, typename QueueType::allocator_type>>; template<typename QueueType> requires thread_queue_concept<QueueType> struct thread_queue { using value_type = typename QueueType::value_type; thread_queue(); explicit thread_queue( typename QueueType::allocator_type const &alloc ); thread_queue( thread_queue &&other ); thread_queue &operator =( thread_queue const &other ); thread_queue &operator =( thread_queue &&other ); bool empty() const; std::size_t size() const; void shrink_to_fit(); void clear(); template<typename ... Args> requires std::is_constructible_v<typename QueueType::value_type, Args ...> void enque( Args &&... args ); template<typename Producer> requires requires( Producer producer ) { { producer() } -> std::same_as<std::pair<bool, typename QueueType::value_type>>; } void enqueue_multiple( Producer producer ); template<typename Consumer> requires requires( Consumer consumer, typename QueueType::value_type value ) { { consumer( std::move( value ) ) } -> std::same_as<bool>; } void dequeue_multiple( Consumer consumer ); typename QueueType::value_type dequeue(); void swap( thread_queue &other ); private: mutable std::mutex m_mtx; mutable std::condition_variable m_cv; QueueType m_queue; }; template<typename QueueType> requires thread_queue_concept<QueueType> thread_queue<QueueType>::thread_queue() { } template<typename QueueType> requires thread_queue_concept<QueueType> thread_queue<QueueType>::thread_queue( typename QueueType::allocator_type const &alloc ) : m_queue( alloc ) { } template<typename QueueType> requires thread_queue_concept<QueueType> thread_queue<QueueType>::thread_queue( thread_queue &&other ) { using namespace std; lock_guard lock( other.m_mtx ); m_queue = move( other.m_queue ); } template<typename QueueType> requires thread_queue_concept<QueueType> thread_queue<QueueType> &thread_queue<QueueType>::thread_queue::operator =( thread_queue const &other ) { std::lock_guard ourLock( m_mtx ), otherLock( other.m_mtx ); m_queue = other.m_queue; return *this; } template<typename QueueType> requires thread_queue_concept<QueueType> thread_queue<QueueType> &thread_queue<QueueType>::thread_queue::operator =( thread_queue &&other ) { using namespace std; lock_guard ourLock( m_mtx ), otherLock( other.m_mtx ); m_queue = move( other.m_queue ); return *this; } template<typename QueueType> requires thread_queue_concept<QueueType> bool thread_queue<QueueType>::thread_queue::empty() const { std::lock_guard lock( m_mtx ); return m_queue.empty(); } template<typename QueueType> requires thread_queue_concept<QueueType> std::size_t thread_queue<QueueType>::thread_queue::size() const { std::lock_guard lock( m_mtx ); return m_queue.size(); } template<typename QueueType> requires thread_queue_concept<QueueType> void thread_queue<QueueType>::thread_queue::shrink_to_fit() { std::lock_guard lock( m_mtx ); return m_queue.shrink_to_fit(); } template<typename QueueType> requires thread_queue_concept<QueueType> void thread_queue<QueueType>::thread_queue::clear() { std::lock_guard lock( m_mtx ); m_queue.clear(); } template<typename QueueType> requires thread_queue_concept<QueueType> template<typename ... Args> requires std::is_constructible_v<typename QueueType::value_type, Args ...> void thread_queue<QueueType>::thread_queue::enque( Args &&... args ) { using namespace std; unique_lock lock( m_mtx ); m_queue.emplace_front( forward<Args>( args ) ... ); m_cv.notify_one(); } template<typename QueueType> requires thread_queue_concept<QueueType> typename QueueType::value_type thread_queue<QueueType>::thread_queue::dequeue() { using namespace std; unique_lock lock( m_mtx ); while( m_queue.empty() ) m_cv.wait( lock ); value_type value = move( m_queue.back() ); m_queue.pop_back(); return value; } template<typename QueueType> requires thread_queue_concept<QueueType> template<typename Producer> requires requires( Producer producer ) { { producer() } -> std::same_as<std::pair<bool, typename QueueType::value_type>>; } void thread_queue<QueueType>::enqueue_multiple( Producer producer ) { using namespace std; lock_guard lock( m_mtx ); for( std::pair<bool, value_type> ret; (ret = move( producer() )).first; ) m_queue.emplace_front( move( ret.second ) ), m_cv.notify_one(); } template<typename QueueType> requires thread_queue_concept<QueueType> template<typename Consumer> requires requires( Consumer consumer, typename QueueType::value_type value ) { { consumer( std::move( value ) ) } -> std::same_as<bool>; } void thread_queue<QueueType>::dequeue_multiple( Consumer consumer ) { using namespace std; unique_lock lock( m_mtx ); for( ; ; ) { while( m_queue.empty() ) m_cv.wait( lock ); try { bool cont = consumer( move( m_queue.back() ) ); m_queue.pop_back(); if( !cont ) return; } catch( ... ) { m_queue.pop_back(); throw; } } } template<typename QueueType> requires thread_queue_concept<QueueType> void thread_queue<QueueType>::thread_queue::swap( thread_queue &other ) { std::lock_guard ourLock( m_mtx ), otherLock( other.m_mtx ); m_queue.swap( other.m_queue ); }


1
投票
如果您有一个生产者和多个消费者,enqueue_multiple 通常是有意义的。这会导致锁定时间更长,因此只有当物品可以快速生产或移动时才有意义。

如果您有多个生产者和一个消费者, dequeue_multiple 通常是有意义的。这里我们也有更长的锁定周期,但由于这里的物体通常只有快速移动,这通常不会造成伤害。

如果 dequeue_multiple 的消费者函数对象在消费时抛出异常,则会捕获该异常,并删除提供给消费者的元素(底层队列类型对象内的右值引用)。
如果您想在 C++11 中使用此类,则必须删除这些概念或使用 #if Defined(__cpp_concepts) 禁用它们。



这是一个类似的线程队列,它有一个布尔参数,决定是否使用 std::list<> 或 std::deque<> 。该列表可能会节省一些内存,但速度较慢。

#include <deque> #include <mutex> #include <condition_variable> #include <utility> #include <concepts> #include <deque> #include <list> #include <chrono> #include <concepts> #include <array> #include <algorithm> template<typename Entity, bool List = false, typename Allocator = std::allocator<Entity>> struct thread_queue { using entity_t = Entity; static constexpr bool uses_list = List; using allocator_t = Allocator; thread_queue(); explicit thread_queue( Allocator const &alloc ); thread_queue( thread_queue const & ) = delete; thread_queue( thread_queue &&other ); thread_queue &operator =( thread_queue const & ) = delete; thread_queue &operator =( thread_queue &&other ); bool empty() const; size_t size() const; void shrink_to_fit(); void clear(); template<typename ... Args> requires std::is_constructible_v<Entity, Args ...> void enque( Args &&... args ); template<typename Producer> requires requires( Producer producer ) { { producer() } -> std::same_as<std::pair<bool, Entity>>; } void enqueue_multiple( Producer producer ); template<std::forward_iterator ForwardIt> requires std::convertible_to<std::iter_value_t<ForwardIt>, Entity> void enqueue_multiple( ForwardIt begin, ForwardIt end ); template<typename Consumer> requires requires( Consumer consumer, Entity value ) { { consumer( std::move( value ) ) } -> std::same_as<bool>; } bool dequeue_multiple( Consumer consumer, std::chrono::milliseconds timeout = std::chrono::milliseconds::min() ); template<std::forward_iterator ForwardIt> requires std::convertible_to<std::iter_value_t<ForwardIt>, Entity> ForwardIt dequeue_multiple( ForwardIt begin, ForwardIt end, std::chrono::milliseconds timeout = std::chrono::milliseconds::min() ); std::pair<bool, Entity> dequeue( std::chrono::milliseconds timeout = std::chrono::milliseconds( -1 ) ); using queue_t = std::conditional_t<!List, std::deque<Entity, Allocator>, std::list<Entity, Allocator>>; queue_t container(); private: mutable std::mutex m_mtx; mutable std::condition_variable m_cv; queue_t m_queue; }; template<typename Entity, bool List, typename Allocator> thread_queue<Entity, List, Allocator>::thread_queue() { } template<typename Entity, bool List, typename Allocator> thread_queue<Entity, List, Allocator>::thread_queue( Allocator const &alloc ) : m_queue( alloc ) { } template<typename Entity, bool List, typename Allocator> thread_queue<Entity, List, Allocator>::thread_queue( thread_queue &&other ) { std::lock_guard lock( m_mtx ); m_queue = move( other.m_queue ); } template<typename Entity, bool List, typename Allocator> thread_queue<Entity, List, Allocator> &thread_queue<Entity, List, Allocator>::thread_queue::operator =( thread_queue &&other ) { using namespace std; lock_guard ourlock( m_mtx ), otherLock( other.m_mon ); m_queue = move( other.m_queue ); return *this; } template<typename Entity, bool List, typename Allocator> bool thread_queue<Entity, List, Allocator>::thread_queue::empty() const { std::lock_guard lock( m_mtx ); return m_queue.empty(); } template<typename Entity, bool List, typename Allocator> size_t thread_queue<Entity, List, Allocator>::thread_queue::size() const { std::lock_guard lock( m_mtx ); return m_queue.size(); } template<typename Entity, bool List, typename Allocator> void thread_queue<Entity, List, Allocator>::thread_queue::shrink_to_fit() { std::lock_guard lock( m_mtx ); return m_queue.shrink_to_fit(); } template<typename Entity, bool List, typename Allocator> void thread_queue<Entity, List, Allocator>::thread_queue::clear() { std::lock_guard lock( m_mtx ); m_queue.clear(); } template<typename Entity, bool List, typename Allocator> template<typename ... Args> requires std::is_constructible_v<Entity, Args ...> void thread_queue<Entity, List, Allocator>::thread_queue::enque( Args &&... args ) { using namespace std; lock_guard lock( m_mtx ); m_queue.emplace_front( forward<Args>( args ) ... ); m_cv.notify_one(); } template<typename Entity, bool List, typename Allocator> template<std::forward_iterator ForwardIt> requires std::convertible_to<std::iter_value_t<ForwardIt>, Entity> void thread_queue<Entity, List, Allocator>::enqueue_multiple( ForwardIt begin, ForwardIt end ) { using namespace std; lock_guard lock( m_mtx ); for( ; begin != end; ++begin ) m_queue.emplace_back( move( *begin ) ); } template<typename Entity, bool List, typename Allocator> template<typename Producer> requires requires( Producer producer ) { { producer() } -> std::same_as<std::pair<bool, Entity>>; } void thread_queue<Entity, List, Allocator>::enqueue_multiple( Producer producer ) { using namespace std; lock_guard lock( m_mtx ); for( pair<bool, Entity> ret; (ret = move( producer() )).first; ) m_queue.emplace_front( move( ret.second ) ), m_cv.notify_one(); } template<typename Entity, bool List, typename Allocator> std::pair<bool, Entity> thread_queue<Entity, List, Allocator>::thread_queue::dequeue( std::chrono::milliseconds timeout ) { using namespace std; unique_lock lock( m_mtx ); while( m_queue.empty() ) if( timeout.count() < 0 ) m_cv.wait( lock ); else if( m_cv.wait_for( lock, timeout ) != cv_status::no_timeout ) return { false, {} }; Entity value = move( m_queue.back() ); m_queue.pop_back(); return { true, move( value ) }; } template<typename Entity, bool List, typename Allocator> template<typename Consumer> requires requires( Consumer consumer, Entity value ) { { consumer( std::move( value ) ) } -> std::same_as<bool>; } bool thread_queue<Entity, List, Allocator>::dequeue_multiple( Consumer consumer, std::chrono::milliseconds timeout ) { using namespace std; unique_lock lock( m_mtx ); size_t size; while( !(size = m_queue.size()) ) if( timeout.count() < 0 ) m_cv.wait( lock ); else if( m_cv.wait_for( lock, timeout ) != cv_status::no_timeout ) return false; do { bool next = consumer( std::move( m_queue.back() ) ); m_queue.pop_back(); if( !next ) return true; } while( --size ); return true; } template<typename Entity, bool List, typename Allocator> template<std::forward_iterator ForwardIt> requires std::convertible_to<std::iter_value_t<ForwardIt>, Entity> ForwardIt thread_queue<Entity, List, Allocator>::dequeue_multiple( ForwardIt begin, ForwardIt end, std::chrono::milliseconds timeout ) { if( begin != end ) dequeue_multiple( [&]( Entity &&value ) { *begin++ = std::move( value ); return begin != end; }, timeout ); return begin; } template<typename Entity, bool List, typename Allocator> typename thread_queue<Entity, List, Allocator>::queue_t thread_queue<Entity, List, Allocator>::container() { using namespace std; lock_guard lock( m_mtx ); return move( m_queue ); }


0
投票

© www.soinside.com 2019 - 2024. All rights reserved.